7406c0b0f3ac482df93b02a2fc197da792c94f0d
[dpdk.git] / lib / librte_ring / rte_ring_elem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10
11 #ifndef _RTE_RING_ELEM_H_
12 #define _RTE_RING_ELEM_H_
13
14 /**
15  * @file
16  * RTE Ring with user defined element size
17  */
18
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22
23 #include <rte_ring_core.h>
24
25 /**
26  * @warning
27  * @b EXPERIMENTAL: this API may change without prior notice
28  *
29  * Calculate the memory size needed for a ring with given element size
30  *
31  * This function returns the number of bytes needed for a ring, given
32  * the number of elements in it and the size of the element. This value
33  * is the sum of the size of the structure rte_ring and the size of the
34  * memory needed for storing the elements. The value is aligned to a cache
35  * line size.
36  *
37  * @param esize
38  *   The size of ring element, in bytes. It must be a multiple of 4.
39  * @param count
40  *   The number of elements in the ring (must be a power of 2).
41  * @return
42  *   - The memory size needed for the ring on success.
43  *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
44  *               power of 2.
45  */
46 __rte_experimental
47 ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
48
49 /**
50  * @warning
51  * @b EXPERIMENTAL: this API may change without prior notice
52  *
53  * Create a new ring named *name* that stores elements with given size.
54  *
55  * This function uses ``memzone_reserve()`` to allocate memory. Then it
56  * calls rte_ring_init() to initialize an empty ring.
57  *
58  * The new ring size is set to *count*, which must be a power of
59  * two. Water marking is disabled by default. The real usable ring size
60  * is *count-1* instead of *count* to differentiate a free ring from an
61  * empty ring.
62  *
63  * The ring is added in RTE_TAILQ_RING list.
64  *
65  * @param name
66  *   The name of the ring.
67  * @param esize
68  *   The size of ring element, in bytes. It must be a multiple of 4.
69  * @param count
70  *   The number of elements in the ring (must be a power of 2).
71  * @param socket_id
72  *   The *socket_id* argument is the socket identifier in case of
73  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
74  *   constraint for the reserved zone.
75  * @param flags
76  *   An OR of the following:
77  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
78  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
79  *      is "single-producer". Otherwise, it is "multi-producers".
80  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
81  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
82  *      is "single-consumer". Otherwise, it is "multi-consumers".
83  * @return
84  *   On success, the pointer to the new allocated ring. NULL on error with
85  *    rte_errno set appropriately. Possible errno values include:
86  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
87  *    - E_RTE_SECONDARY - function was called from a secondary process instance
88  *    - EINVAL - esize is not a multiple of 4 or count provided is not a
89  *               power of 2.
90  *    - ENOSPC - the maximum number of memzones has already been allocated
91  *    - EEXIST - a memzone with the same name already exists
92  *    - ENOMEM - no appropriate memory area found in which to create memzone
93  */
94 __rte_experimental
95 struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
96                         unsigned int count, int socket_id, unsigned int flags);
97
98 static __rte_always_inline void
99 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
100                 uint32_t idx, const void *obj_table, uint32_t n)
101 {
102         unsigned int i;
103         uint32_t *ring = (uint32_t *)&r[1];
104         const uint32_t *obj = (const uint32_t *)obj_table;
105         if (likely(idx + n < size)) {
106                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
107                         ring[idx] = obj[i];
108                         ring[idx + 1] = obj[i + 1];
109                         ring[idx + 2] = obj[i + 2];
110                         ring[idx + 3] = obj[i + 3];
111                         ring[idx + 4] = obj[i + 4];
112                         ring[idx + 5] = obj[i + 5];
113                         ring[idx + 6] = obj[i + 6];
114                         ring[idx + 7] = obj[i + 7];
115                 }
116                 switch (n & 0x7) {
117                 case 7:
118                         ring[idx++] = obj[i++]; /* fallthrough */
119                 case 6:
120                         ring[idx++] = obj[i++]; /* fallthrough */
121                 case 5:
122                         ring[idx++] = obj[i++]; /* fallthrough */
123                 case 4:
124                         ring[idx++] = obj[i++]; /* fallthrough */
125                 case 3:
126                         ring[idx++] = obj[i++]; /* fallthrough */
127                 case 2:
128                         ring[idx++] = obj[i++]; /* fallthrough */
129                 case 1:
130                         ring[idx++] = obj[i++]; /* fallthrough */
131                 }
132         } else {
133                 for (i = 0; idx < size; i++, idx++)
134                         ring[idx] = obj[i];
135                 /* Start at the beginning */
136                 for (idx = 0; i < n; i++, idx++)
137                         ring[idx] = obj[i];
138         }
139 }
140
141 static __rte_always_inline void
142 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
143                 const void *obj_table, uint32_t n)
144 {
145         unsigned int i;
146         const uint32_t size = r->size;
147         uint32_t idx = prod_head & r->mask;
148         uint64_t *ring = (uint64_t *)&r[1];
149         const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
150         if (likely(idx + n < size)) {
151                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
152                         ring[idx] = obj[i];
153                         ring[idx + 1] = obj[i + 1];
154                         ring[idx + 2] = obj[i + 2];
155                         ring[idx + 3] = obj[i + 3];
156                 }
157                 switch (n & 0x3) {
158                 case 3:
159                         ring[idx++] = obj[i++]; /* fallthrough */
160                 case 2:
161                         ring[idx++] = obj[i++]; /* fallthrough */
162                 case 1:
163                         ring[idx++] = obj[i++];
164                 }
165         } else {
166                 for (i = 0; idx < size; i++, idx++)
167                         ring[idx] = obj[i];
168                 /* Start at the beginning */
169                 for (idx = 0; i < n; i++, idx++)
170                         ring[idx] = obj[i];
171         }
172 }
173
174 static __rte_always_inline void
175 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
176                 const void *obj_table, uint32_t n)
177 {
178         unsigned int i;
179         const uint32_t size = r->size;
180         uint32_t idx = prod_head & r->mask;
181         rte_int128_t *ring = (rte_int128_t *)&r[1];
182         const rte_int128_t *obj = (const rte_int128_t *)obj_table;
183         if (likely(idx + n < size)) {
184                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
185                         memcpy((void *)(ring + idx),
186                                 (const void *)(obj + i), 32);
187                 switch (n & 0x1) {
188                 case 1:
189                         memcpy((void *)(ring + idx),
190                                 (const void *)(obj + i), 16);
191                 }
192         } else {
193                 for (i = 0; idx < size; i++, idx++)
194                         memcpy((void *)(ring + idx),
195                                 (const void *)(obj + i), 16);
196                 /* Start at the beginning */
197                 for (idx = 0; i < n; i++, idx++)
198                         memcpy((void *)(ring + idx),
199                                 (const void *)(obj + i), 16);
200         }
201 }
202
203 /* the actual enqueue of elements on the ring.
204  * Placed here since identical code needed in both
205  * single and multi producer enqueue functions.
206  */
207 static __rte_always_inline void
208 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
209                 const void *obj_table, uint32_t esize, uint32_t num)
210 {
211         /* 8B and 16B copies implemented individually to retain
212          * the current performance.
213          */
214         if (esize == 8)
215                 __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
216         else if (esize == 16)
217                 __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
218         else {
219                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
220
221                 /* Normalize to uint32_t */
222                 scale = esize / sizeof(uint32_t);
223                 nr_num = num * scale;
224                 idx = prod_head & r->mask;
225                 nr_idx = idx * scale;
226                 nr_size = r->size * scale;
227                 __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
228                                 obj_table, nr_num);
229         }
230 }
231
232 static __rte_always_inline void
233 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
234                 uint32_t idx, void *obj_table, uint32_t n)
235 {
236         unsigned int i;
237         uint32_t *ring = (uint32_t *)&r[1];
238         uint32_t *obj = (uint32_t *)obj_table;
239         if (likely(idx + n < size)) {
240                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
241                         obj[i] = ring[idx];
242                         obj[i + 1] = ring[idx + 1];
243                         obj[i + 2] = ring[idx + 2];
244                         obj[i + 3] = ring[idx + 3];
245                         obj[i + 4] = ring[idx + 4];
246                         obj[i + 5] = ring[idx + 5];
247                         obj[i + 6] = ring[idx + 6];
248                         obj[i + 7] = ring[idx + 7];
249                 }
250                 switch (n & 0x7) {
251                 case 7:
252                         obj[i++] = ring[idx++]; /* fallthrough */
253                 case 6:
254                         obj[i++] = ring[idx++]; /* fallthrough */
255                 case 5:
256                         obj[i++] = ring[idx++]; /* fallthrough */
257                 case 4:
258                         obj[i++] = ring[idx++]; /* fallthrough */
259                 case 3:
260                         obj[i++] = ring[idx++]; /* fallthrough */
261                 case 2:
262                         obj[i++] = ring[idx++]; /* fallthrough */
263                 case 1:
264                         obj[i++] = ring[idx++]; /* fallthrough */
265                 }
266         } else {
267                 for (i = 0; idx < size; i++, idx++)
268                         obj[i] = ring[idx];
269                 /* Start at the beginning */
270                 for (idx = 0; i < n; i++, idx++)
271                         obj[i] = ring[idx];
272         }
273 }
274
275 static __rte_always_inline void
276 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
277                 void *obj_table, uint32_t n)
278 {
279         unsigned int i;
280         const uint32_t size = r->size;
281         uint32_t idx = prod_head & r->mask;
282         uint64_t *ring = (uint64_t *)&r[1];
283         unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
284         if (likely(idx + n < size)) {
285                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
286                         obj[i] = ring[idx];
287                         obj[i + 1] = ring[idx + 1];
288                         obj[i + 2] = ring[idx + 2];
289                         obj[i + 3] = ring[idx + 3];
290                 }
291                 switch (n & 0x3) {
292                 case 3:
293                         obj[i++] = ring[idx++]; /* fallthrough */
294                 case 2:
295                         obj[i++] = ring[idx++]; /* fallthrough */
296                 case 1:
297                         obj[i++] = ring[idx++]; /* fallthrough */
298                 }
299         } else {
300                 for (i = 0; idx < size; i++, idx++)
301                         obj[i] = ring[idx];
302                 /* Start at the beginning */
303                 for (idx = 0; i < n; i++, idx++)
304                         obj[i] = ring[idx];
305         }
306 }
307
308 static __rte_always_inline void
309 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
310                 void *obj_table, uint32_t n)
311 {
312         unsigned int i;
313         const uint32_t size = r->size;
314         uint32_t idx = prod_head & r->mask;
315         rte_int128_t *ring = (rte_int128_t *)&r[1];
316         rte_int128_t *obj = (rte_int128_t *)obj_table;
317         if (likely(idx + n < size)) {
318                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
319                         memcpy((void *)(obj + i), (void *)(ring + idx), 32);
320                 switch (n & 0x1) {
321                 case 1:
322                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
323                 }
324         } else {
325                 for (i = 0; idx < size; i++, idx++)
326                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
327                 /* Start at the beginning */
328                 for (idx = 0; i < n; i++, idx++)
329                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
330         }
331 }
332
333 /* the actual dequeue of elements from the ring.
334  * Placed here since identical code needed in both
335  * single and multi producer enqueue functions.
336  */
337 static __rte_always_inline void
338 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
339                 void *obj_table, uint32_t esize, uint32_t num)
340 {
341         /* 8B and 16B copies implemented individually to retain
342          * the current performance.
343          */
344         if (esize == 8)
345                 __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
346         else if (esize == 16)
347                 __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
348         else {
349                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
350
351                 /* Normalize to uint32_t */
352                 scale = esize / sizeof(uint32_t);
353                 nr_num = num * scale;
354                 idx = cons_head & r->mask;
355                 nr_idx = idx * scale;
356                 nr_size = r->size * scale;
357                 __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
358                                 obj_table, nr_num);
359         }
360 }
361
362 /* Between load and load. there might be cpu reorder in weak model
363  * (powerpc/arm).
364  * There are 2 choices for the users
365  * 1.use rmb() memory barrier
366  * 2.use one-direction load_acquire/store_release barrier,defined by
367  * CONFIG_RTE_USE_C11_MEM_MODEL=y
368  * It depends on performance test results.
369  * By default, move common functions to rte_ring_generic.h
370  */
371 #ifdef RTE_USE_C11_MEM_MODEL
372 #include "rte_ring_c11_mem.h"
373 #else
374 #include "rte_ring_generic.h"
375 #endif
376
377 /**
378  * @internal Enqueue several objects on the ring
379  *
380  * @param r
381  *   A pointer to the ring structure.
382  * @param obj_table
383  *   A pointer to a table of objects.
384  * @param esize
385  *   The size of ring element, in bytes. It must be a multiple of 4.
386  *   This must be the same value used while creating the ring. Otherwise
387  *   the results are undefined.
388  * @param n
389  *   The number of objects to add in the ring from the obj_table.
390  * @param behavior
391  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
392  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
393  * @param is_sp
394  *   Indicates whether to use single producer or multi-producer head update
395  * @param free_space
396  *   returns the amount of space after the enqueue operation has finished
397  * @return
398  *   Actual number of objects enqueued.
399  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
400  */
401 static __rte_always_inline unsigned int
402 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
403                 unsigned int esize, unsigned int n,
404                 enum rte_ring_queue_behavior behavior, unsigned int is_sp,
405                 unsigned int *free_space)
406 {
407         uint32_t prod_head, prod_next;
408         uint32_t free_entries;
409
410         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
411                         &prod_head, &prod_next, &free_entries);
412         if (n == 0)
413                 goto end;
414
415         __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
416
417         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
418 end:
419         if (free_space != NULL)
420                 *free_space = free_entries - n;
421         return n;
422 }
423
424 /**
425  * @internal Dequeue several objects from the ring
426  *
427  * @param r
428  *   A pointer to the ring structure.
429  * @param obj_table
430  *   A pointer to a table of objects.
431  * @param esize
432  *   The size of ring element, in bytes. It must be a multiple of 4.
433  *   This must be the same value used while creating the ring. Otherwise
434  *   the results are undefined.
435  * @param n
436  *   The number of objects to pull from the ring.
437  * @param behavior
438  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
439  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
440  * @param is_sc
441  *   Indicates whether to use single consumer or multi-consumer head update
442  * @param available
443  *   returns the number of remaining ring entries after the dequeue has finished
444  * @return
445  *   - Actual number of objects dequeued.
446  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
447  */
448 static __rte_always_inline unsigned int
449 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
450                 unsigned int esize, unsigned int n,
451                 enum rte_ring_queue_behavior behavior, unsigned int is_sc,
452                 unsigned int *available)
453 {
454         uint32_t cons_head, cons_next;
455         uint32_t entries;
456
457         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
458                         &cons_head, &cons_next, &entries);
459         if (n == 0)
460                 goto end;
461
462         __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
463
464         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
465
466 end:
467         if (available != NULL)
468                 *available = entries - n;
469         return n;
470 }
471
472 /**
473  * Enqueue several objects on the ring (multi-producers safe).
474  *
475  * This function uses a "compare and set" instruction to move the
476  * producer index atomically.
477  *
478  * @param r
479  *   A pointer to the ring structure.
480  * @param obj_table
481  *   A pointer to a table of objects.
482  * @param esize
483  *   The size of ring element, in bytes. It must be a multiple of 4.
484  *   This must be the same value used while creating the ring. Otherwise
485  *   the results are undefined.
486  * @param n
487  *   The number of objects to add in the ring from the obj_table.
488  * @param free_space
489  *   if non-NULL, returns the amount of space in the ring after the
490  *   enqueue operation has finished.
491  * @return
492  *   The number of objects enqueued, either 0 or n
493  */
494 static __rte_always_inline unsigned int
495 rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
496                 unsigned int esize, unsigned int n, unsigned int *free_space)
497 {
498         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
499                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space);
500 }
501
502 /**
503  * Enqueue several objects on a ring
504  *
505  * @warning This API is NOT multi-producers safe
506  *
507  * @param r
508  *   A pointer to the ring structure.
509  * @param obj_table
510  *   A pointer to a table of objects.
511  * @param esize
512  *   The size of ring element, in bytes. It must be a multiple of 4.
513  *   This must be the same value used while creating the ring. Otherwise
514  *   the results are undefined.
515  * @param n
516  *   The number of objects to add in the ring from the obj_table.
517  * @param free_space
518  *   if non-NULL, returns the amount of space in the ring after the
519  *   enqueue operation has finished.
520  * @return
521  *   The number of objects enqueued, either 0 or n
522  */
523 static __rte_always_inline unsigned int
524 rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
525                 unsigned int esize, unsigned int n, unsigned int *free_space)
526 {
527         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
528                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space);
529 }
530
531 /**
532  * Enqueue several objects on a ring.
533  *
534  * This function calls the multi-producer or the single-producer
535  * version depending on the default behavior that was specified at
536  * ring creation time (see flags).
537  *
538  * @param r
539  *   A pointer to the ring structure.
540  * @param obj_table
541  *   A pointer to a table of objects.
542  * @param esize
543  *   The size of ring element, in bytes. It must be a multiple of 4.
544  *   This must be the same value used while creating the ring. Otherwise
545  *   the results are undefined.
546  * @param n
547  *   The number of objects to add in the ring from the obj_table.
548  * @param free_space
549  *   if non-NULL, returns the amount of space in the ring after the
550  *   enqueue operation has finished.
551  * @return
552  *   The number of objects enqueued, either 0 or n
553  */
554 static __rte_always_inline unsigned int
555 rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
556                 unsigned int esize, unsigned int n, unsigned int *free_space)
557 {
558         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
559                         RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
560 }
561
562 /**
563  * Enqueue one object on a ring (multi-producers safe).
564  *
565  * This function uses a "compare and set" instruction to move the
566  * producer index atomically.
567  *
568  * @param r
569  *   A pointer to the ring structure.
570  * @param obj
571  *   A pointer to the object to be added.
572  * @param esize
573  *   The size of ring element, in bytes. It must be a multiple of 4.
574  *   This must be the same value used while creating the ring. Otherwise
575  *   the results are undefined.
576  * @return
577  *   - 0: Success; objects enqueued.
578  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
579  */
580 static __rte_always_inline int
581 rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
582 {
583         return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
584                                                                 -ENOBUFS;
585 }
586
587 /**
588  * Enqueue one object on a ring
589  *
590  * @warning This API is NOT multi-producers safe
591  *
592  * @param r
593  *   A pointer to the ring structure.
594  * @param obj
595  *   A pointer to the object to be added.
596  * @param esize
597  *   The size of ring element, in bytes. It must be a multiple of 4.
598  *   This must be the same value used while creating the ring. Otherwise
599  *   the results are undefined.
600  * @return
601  *   - 0: Success; objects enqueued.
602  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
603  */
604 static __rte_always_inline int
605 rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
606 {
607         return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
608                                                                 -ENOBUFS;
609 }
610
611 /**
612  * Enqueue one object on a ring.
613  *
614  * This function calls the multi-producer or the single-producer
615  * version, depending on the default behaviour that was specified at
616  * ring creation time (see flags).
617  *
618  * @param r
619  *   A pointer to the ring structure.
620  * @param obj
621  *   A pointer to the object to be added.
622  * @param esize
623  *   The size of ring element, in bytes. It must be a multiple of 4.
624  *   This must be the same value used while creating the ring. Otherwise
625  *   the results are undefined.
626  * @return
627  *   - 0: Success; objects enqueued.
628  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
629  */
630 static __rte_always_inline int
631 rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
632 {
633         return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
634                                                                 -ENOBUFS;
635 }
636
637 /**
638  * Dequeue several objects from a ring (multi-consumers safe).
639  *
640  * This function uses a "compare and set" instruction to move the
641  * consumer index atomically.
642  *
643  * @param r
644  *   A pointer to the ring structure.
645  * @param obj_table
646  *   A pointer to a table of objects that will be filled.
647  * @param esize
648  *   The size of ring element, in bytes. It must be a multiple of 4.
649  *   This must be the same value used while creating the ring. Otherwise
650  *   the results are undefined.
651  * @param n
652  *   The number of objects to dequeue from the ring to the obj_table.
653  * @param available
654  *   If non-NULL, returns the number of remaining ring entries after the
655  *   dequeue has finished.
656  * @return
657  *   The number of objects dequeued, either 0 or n
658  */
659 static __rte_always_inline unsigned int
660 rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
661                 unsigned int esize, unsigned int n, unsigned int *available)
662 {
663         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
664                                 RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available);
665 }
666
667 /**
668  * Dequeue several objects from a ring (NOT multi-consumers safe).
669  *
670  * @param r
671  *   A pointer to the ring structure.
672  * @param obj_table
673  *   A pointer to a table of objects that will be filled.
674  * @param esize
675  *   The size of ring element, in bytes. It must be a multiple of 4.
676  *   This must be the same value used while creating the ring. Otherwise
677  *   the results are undefined.
678  * @param n
679  *   The number of objects to dequeue from the ring to the obj_table,
680  *   must be strictly positive.
681  * @param available
682  *   If non-NULL, returns the number of remaining ring entries after the
683  *   dequeue has finished.
684  * @return
685  *   The number of objects dequeued, either 0 or n
686  */
687 static __rte_always_inline unsigned int
688 rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
689                 unsigned int esize, unsigned int n, unsigned int *available)
690 {
691         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
692                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available);
693 }
694
695 /**
696  * Dequeue several objects from a ring.
697  *
698  * This function calls the multi-consumers or the single-consumer
699  * version, depending on the default behaviour that was specified at
700  * ring creation time (see flags).
701  *
702  * @param r
703  *   A pointer to the ring structure.
704  * @param obj_table
705  *   A pointer to a table of objects that will be filled.
706  * @param esize
707  *   The size of ring element, in bytes. It must be a multiple of 4.
708  *   This must be the same value used while creating the ring. Otherwise
709  *   the results are undefined.
710  * @param n
711  *   The number of objects to dequeue from the ring to the obj_table.
712  * @param available
713  *   If non-NULL, returns the number of remaining ring entries after the
714  *   dequeue has finished.
715  * @return
716  *   The number of objects dequeued, either 0 or n
717  */
718 static __rte_always_inline unsigned int
719 rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
720                 unsigned int esize, unsigned int n, unsigned int *available)
721 {
722         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
723                         RTE_RING_QUEUE_FIXED, r->cons.sync_type, available);
724 }
725
726 /**
727  * Dequeue one object from a ring (multi-consumers safe).
728  *
729  * This function uses a "compare and set" instruction to move the
730  * consumer index atomically.
731  *
732  * @param r
733  *   A pointer to the ring structure.
734  * @param obj_p
735  *   A pointer to the object that will be filled.
736  * @param esize
737  *   The size of ring element, in bytes. It must be a multiple of 4.
738  *   This must be the same value used while creating the ring. Otherwise
739  *   the results are undefined.
740  * @return
741  *   - 0: Success; objects dequeued.
742  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
743  *     dequeued.
744  */
745 static __rte_always_inline int
746 rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
747                                 unsigned int esize)
748 {
749         return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
750                                                                 -ENOENT;
751 }
752
753 /**
754  * Dequeue one object from a ring (NOT multi-consumers safe).
755  *
756  * @param r
757  *   A pointer to the ring structure.
758  * @param obj_p
759  *   A pointer to the object that will be filled.
760  * @param esize
761  *   The size of ring element, in bytes. It must be a multiple of 4.
762  *   This must be the same value used while creating the ring. Otherwise
763  *   the results are undefined.
764  * @return
765  *   - 0: Success; objects dequeued.
766  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
767  *     dequeued.
768  */
769 static __rte_always_inline int
770 rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
771                                 unsigned int esize)
772 {
773         return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
774                                                                 -ENOENT;
775 }
776
777 /**
778  * Dequeue one object from a ring.
779  *
780  * This function calls the multi-consumers or the single-consumer
781  * version depending on the default behaviour that was specified at
782  * ring creation time (see flags).
783  *
784  * @param r
785  *   A pointer to the ring structure.
786  * @param obj_p
787  *   A pointer to the object that will be filled.
788  * @param esize
789  *   The size of ring element, in bytes. It must be a multiple of 4.
790  *   This must be the same value used while creating the ring. Otherwise
791  *   the results are undefined.
792  * @return
793  *   - 0: Success, objects dequeued.
794  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
795  *     dequeued.
796  */
797 static __rte_always_inline int
798 rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
799 {
800         return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
801                                                                 -ENOENT;
802 }
803
804 /**
805  * Enqueue several objects on the ring (multi-producers safe).
806  *
807  * This function uses a "compare and set" instruction to move the
808  * producer index atomically.
809  *
810  * @param r
811  *   A pointer to the ring structure.
812  * @param obj_table
813  *   A pointer to a table of objects.
814  * @param esize
815  *   The size of ring element, in bytes. It must be a multiple of 4.
816  *   This must be the same value used while creating the ring. Otherwise
817  *   the results are undefined.
818  * @param n
819  *   The number of objects to add in the ring from the obj_table.
820  * @param free_space
821  *   if non-NULL, returns the amount of space in the ring after the
822  *   enqueue operation has finished.
823  * @return
824  *   - n: Actual number of objects enqueued.
825  */
826 static __rte_always_inline unsigned
827 rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
828                 unsigned int esize, unsigned int n, unsigned int *free_space)
829 {
830         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
831                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
832 }
833
834 /**
835  * Enqueue several objects on a ring
836  *
837  * @warning This API is NOT multi-producers safe
838  *
839  * @param r
840  *   A pointer to the ring structure.
841  * @param obj_table
842  *   A pointer to a table of objects.
843  * @param esize
844  *   The size of ring element, in bytes. It must be a multiple of 4.
845  *   This must be the same value used while creating the ring. Otherwise
846  *   the results are undefined.
847  * @param n
848  *   The number of objects to add in the ring from the obj_table.
849  * @param free_space
850  *   if non-NULL, returns the amount of space in the ring after the
851  *   enqueue operation has finished.
852  * @return
853  *   - n: Actual number of objects enqueued.
854  */
855 static __rte_always_inline unsigned
856 rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
857                 unsigned int esize, unsigned int n, unsigned int *free_space)
858 {
859         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
860                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
861 }
862
863 /**
864  * Enqueue several objects on a ring.
865  *
866  * This function calls the multi-producer or the single-producer
867  * version depending on the default behavior that was specified at
868  * ring creation time (see flags).
869  *
870  * @param r
871  *   A pointer to the ring structure.
872  * @param obj_table
873  *   A pointer to a table of objects.
874  * @param esize
875  *   The size of ring element, in bytes. It must be a multiple of 4.
876  *   This must be the same value used while creating the ring. Otherwise
877  *   the results are undefined.
878  * @param n
879  *   The number of objects to add in the ring from the obj_table.
880  * @param free_space
881  *   if non-NULL, returns the amount of space in the ring after the
882  *   enqueue operation has finished.
883  * @return
884  *   - n: Actual number of objects enqueued.
885  */
886 static __rte_always_inline unsigned
887 rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
888                 unsigned int esize, unsigned int n, unsigned int *free_space)
889 {
890         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
891                         RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space);
892 }
893
894 /**
895  * Dequeue several objects from a ring (multi-consumers safe). When the request
896  * objects are more than the available objects, only dequeue the actual number
897  * of objects
898  *
899  * This function uses a "compare and set" instruction to move the
900  * consumer index atomically.
901  *
902  * @param r
903  *   A pointer to the ring structure.
904  * @param obj_table
905  *   A pointer to a table of objects that will be filled.
906  * @param esize
907  *   The size of ring element, in bytes. It must be a multiple of 4.
908  *   This must be the same value used while creating the ring. Otherwise
909  *   the results are undefined.
910  * @param n
911  *   The number of objects to dequeue from the ring to the obj_table.
912  * @param available
913  *   If non-NULL, returns the number of remaining ring entries after the
914  *   dequeue has finished.
915  * @return
916  *   - n: Actual number of objects dequeued, 0 if ring is empty
917  */
918 static __rte_always_inline unsigned
919 rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
920                 unsigned int esize, unsigned int n, unsigned int *available)
921 {
922         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
923                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
924 }
925
926 /**
927  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
928  * request objects are more than the available objects, only dequeue the
929  * actual number of objects
930  *
931  * @param r
932  *   A pointer to the ring structure.
933  * @param obj_table
934  *   A pointer to a table of objects that will be filled.
935  * @param esize
936  *   The size of ring element, in bytes. It must be a multiple of 4.
937  *   This must be the same value used while creating the ring. Otherwise
938  *   the results are undefined.
939  * @param n
940  *   The number of objects to dequeue from the ring to the obj_table.
941  * @param available
942  *   If non-NULL, returns the number of remaining ring entries after the
943  *   dequeue has finished.
944  * @return
945  *   - n: Actual number of objects dequeued, 0 if ring is empty
946  */
947 static __rte_always_inline unsigned
948 rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
949                 unsigned int esize, unsigned int n, unsigned int *available)
950 {
951         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
952                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
953 }
954
955 /**
956  * Dequeue multiple objects from a ring up to a maximum number.
957  *
958  * This function calls the multi-consumers or the single-consumer
959  * version, depending on the default behaviour that was specified at
960  * ring creation time (see flags).
961  *
962  * @param r
963  *   A pointer to the ring structure.
964  * @param obj_table
965  *   A pointer to a table of objects that will be filled.
966  * @param esize
967  *   The size of ring element, in bytes. It must be a multiple of 4.
968  *   This must be the same value used while creating the ring. Otherwise
969  *   the results are undefined.
970  * @param n
971  *   The number of objects to dequeue from the ring to the obj_table.
972  * @param available
973  *   If non-NULL, returns the number of remaining ring entries after the
974  *   dequeue has finished.
975  * @return
976  *   - Number of objects dequeued
977  */
978 static __rte_always_inline unsigned int
979 rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
980                 unsigned int esize, unsigned int n, unsigned int *available)
981 {
982         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
983                                 RTE_RING_QUEUE_VARIABLE,
984                                 r->cons.sync_type, available);
985 }
986
987 #include <rte_ring.h>
988
989 #ifdef __cplusplus
990 }
991 #endif
992
993 #endif /* _RTE_RING_ELEM_H_ */