ring: introduce HTS ring mode
[dpdk.git] / lib / librte_ring / rte_ring_elem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10
11 #ifndef _RTE_RING_ELEM_H_
12 #define _RTE_RING_ELEM_H_
13
14 /**
15  * @file
16  * RTE Ring with user defined element size
17  */
18
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22
23 #include <rte_ring_core.h>
24
25 /**
26  * @warning
27  * @b EXPERIMENTAL: this API may change without prior notice
28  *
29  * Calculate the memory size needed for a ring with given element size
30  *
31  * This function returns the number of bytes needed for a ring, given
32  * the number of elements in it and the size of the element. This value
33  * is the sum of the size of the structure rte_ring and the size of the
34  * memory needed for storing the elements. The value is aligned to a cache
35  * line size.
36  *
37  * @param esize
38  *   The size of ring element, in bytes. It must be a multiple of 4.
39  * @param count
40  *   The number of elements in the ring (must be a power of 2).
41  * @return
42  *   - The memory size needed for the ring on success.
43  *   - -EINVAL - esize is not a multiple of 4 or count provided is not a
44  *               power of 2.
45  */
46 __rte_experimental
47 ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
48
49 /**
50  * @warning
51  * @b EXPERIMENTAL: this API may change without prior notice
52  *
53  * Create a new ring named *name* that stores elements with given size.
54  *
55  * This function uses ``memzone_reserve()`` to allocate memory. Then it
56  * calls rte_ring_init() to initialize an empty ring.
57  *
58  * The new ring size is set to *count*, which must be a power of
59  * two. Water marking is disabled by default. The real usable ring size
60  * is *count-1* instead of *count* to differentiate a free ring from an
61  * empty ring.
62  *
63  * The ring is added in RTE_TAILQ_RING list.
64  *
65  * @param name
66  *   The name of the ring.
67  * @param esize
68  *   The size of ring element, in bytes. It must be a multiple of 4.
69  * @param count
70  *   The number of elements in the ring (must be a power of 2).
71  * @param socket_id
72  *   The *socket_id* argument is the socket identifier in case of
73  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
74  *   constraint for the reserved zone.
75  * @param flags
76  *   An OR of the following:
77  *   - One of mutually exclusive flags that define producer behavior:
78  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
79  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
80  *        is "single-producer".
81  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
82  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
83  *        is "multi-producer RTS mode".
84  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
85  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
86  *        is "multi-producer HTS mode".
87  *     If none of these flags is set, then default "multi-producer"
88  *     behavior is selected.
89  *   - One of mutually exclusive flags that define consumer behavior:
90  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
91  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
92  *        is "single-consumer". Otherwise, it is "multi-consumers".
93  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
94  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
95  *        is "multi-consumer RTS mode".
96  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
97  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
98  *        is "multi-consumer HTS mode".
99  *     If none of these flags is set, then default "multi-consumer"
100  *     behavior is selected.
101  * @return
102  *   On success, the pointer to the new allocated ring. NULL on error with
103  *    rte_errno set appropriately. Possible errno values include:
104  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
105  *    - E_RTE_SECONDARY - function was called from a secondary process instance
106  *    - EINVAL - esize is not a multiple of 4 or count provided is not a
107  *               power of 2.
108  *    - ENOSPC - the maximum number of memzones has already been allocated
109  *    - EEXIST - a memzone with the same name already exists
110  *    - ENOMEM - no appropriate memory area found in which to create memzone
111  */
112 __rte_experimental
113 struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
114                         unsigned int count, int socket_id, unsigned int flags);
115
116 static __rte_always_inline void
117 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
118                 uint32_t idx, const void *obj_table, uint32_t n)
119 {
120         unsigned int i;
121         uint32_t *ring = (uint32_t *)&r[1];
122         const uint32_t *obj = (const uint32_t *)obj_table;
123         if (likely(idx + n < size)) {
124                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
125                         ring[idx] = obj[i];
126                         ring[idx + 1] = obj[i + 1];
127                         ring[idx + 2] = obj[i + 2];
128                         ring[idx + 3] = obj[i + 3];
129                         ring[idx + 4] = obj[i + 4];
130                         ring[idx + 5] = obj[i + 5];
131                         ring[idx + 6] = obj[i + 6];
132                         ring[idx + 7] = obj[i + 7];
133                 }
134                 switch (n & 0x7) {
135                 case 7:
136                         ring[idx++] = obj[i++]; /* fallthrough */
137                 case 6:
138                         ring[idx++] = obj[i++]; /* fallthrough */
139                 case 5:
140                         ring[idx++] = obj[i++]; /* fallthrough */
141                 case 4:
142                         ring[idx++] = obj[i++]; /* fallthrough */
143                 case 3:
144                         ring[idx++] = obj[i++]; /* fallthrough */
145                 case 2:
146                         ring[idx++] = obj[i++]; /* fallthrough */
147                 case 1:
148                         ring[idx++] = obj[i++]; /* fallthrough */
149                 }
150         } else {
151                 for (i = 0; idx < size; i++, idx++)
152                         ring[idx] = obj[i];
153                 /* Start at the beginning */
154                 for (idx = 0; i < n; i++, idx++)
155                         ring[idx] = obj[i];
156         }
157 }
158
159 static __rte_always_inline void
160 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
161                 const void *obj_table, uint32_t n)
162 {
163         unsigned int i;
164         const uint32_t size = r->size;
165         uint32_t idx = prod_head & r->mask;
166         uint64_t *ring = (uint64_t *)&r[1];
167         const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
168         if (likely(idx + n < size)) {
169                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
170                         ring[idx] = obj[i];
171                         ring[idx + 1] = obj[i + 1];
172                         ring[idx + 2] = obj[i + 2];
173                         ring[idx + 3] = obj[i + 3];
174                 }
175                 switch (n & 0x3) {
176                 case 3:
177                         ring[idx++] = obj[i++]; /* fallthrough */
178                 case 2:
179                         ring[idx++] = obj[i++]; /* fallthrough */
180                 case 1:
181                         ring[idx++] = obj[i++];
182                 }
183         } else {
184                 for (i = 0; idx < size; i++, idx++)
185                         ring[idx] = obj[i];
186                 /* Start at the beginning */
187                 for (idx = 0; i < n; i++, idx++)
188                         ring[idx] = obj[i];
189         }
190 }
191
192 static __rte_always_inline void
193 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
194                 const void *obj_table, uint32_t n)
195 {
196         unsigned int i;
197         const uint32_t size = r->size;
198         uint32_t idx = prod_head & r->mask;
199         rte_int128_t *ring = (rte_int128_t *)&r[1];
200         const rte_int128_t *obj = (const rte_int128_t *)obj_table;
201         if (likely(idx + n < size)) {
202                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
203                         memcpy((void *)(ring + idx),
204                                 (const void *)(obj + i), 32);
205                 switch (n & 0x1) {
206                 case 1:
207                         memcpy((void *)(ring + idx),
208                                 (const void *)(obj + i), 16);
209                 }
210         } else {
211                 for (i = 0; idx < size; i++, idx++)
212                         memcpy((void *)(ring + idx),
213                                 (const void *)(obj + i), 16);
214                 /* Start at the beginning */
215                 for (idx = 0; i < n; i++, idx++)
216                         memcpy((void *)(ring + idx),
217                                 (const void *)(obj + i), 16);
218         }
219 }
220
221 /* the actual enqueue of elements on the ring.
222  * Placed here since identical code needed in both
223  * single and multi producer enqueue functions.
224  */
225 static __rte_always_inline void
226 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
227                 const void *obj_table, uint32_t esize, uint32_t num)
228 {
229         /* 8B and 16B copies implemented individually to retain
230          * the current performance.
231          */
232         if (esize == 8)
233                 __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
234         else if (esize == 16)
235                 __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
236         else {
237                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
238
239                 /* Normalize to uint32_t */
240                 scale = esize / sizeof(uint32_t);
241                 nr_num = num * scale;
242                 idx = prod_head & r->mask;
243                 nr_idx = idx * scale;
244                 nr_size = r->size * scale;
245                 __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
246                                 obj_table, nr_num);
247         }
248 }
249
250 static __rte_always_inline void
251 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
252                 uint32_t idx, void *obj_table, uint32_t n)
253 {
254         unsigned int i;
255         uint32_t *ring = (uint32_t *)&r[1];
256         uint32_t *obj = (uint32_t *)obj_table;
257         if (likely(idx + n < size)) {
258                 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
259                         obj[i] = ring[idx];
260                         obj[i + 1] = ring[idx + 1];
261                         obj[i + 2] = ring[idx + 2];
262                         obj[i + 3] = ring[idx + 3];
263                         obj[i + 4] = ring[idx + 4];
264                         obj[i + 5] = ring[idx + 5];
265                         obj[i + 6] = ring[idx + 6];
266                         obj[i + 7] = ring[idx + 7];
267                 }
268                 switch (n & 0x7) {
269                 case 7:
270                         obj[i++] = ring[idx++]; /* fallthrough */
271                 case 6:
272                         obj[i++] = ring[idx++]; /* fallthrough */
273                 case 5:
274                         obj[i++] = ring[idx++]; /* fallthrough */
275                 case 4:
276                         obj[i++] = ring[idx++]; /* fallthrough */
277                 case 3:
278                         obj[i++] = ring[idx++]; /* fallthrough */
279                 case 2:
280                         obj[i++] = ring[idx++]; /* fallthrough */
281                 case 1:
282                         obj[i++] = ring[idx++]; /* fallthrough */
283                 }
284         } else {
285                 for (i = 0; idx < size; i++, idx++)
286                         obj[i] = ring[idx];
287                 /* Start at the beginning */
288                 for (idx = 0; i < n; i++, idx++)
289                         obj[i] = ring[idx];
290         }
291 }
292
293 static __rte_always_inline void
294 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
295                 void *obj_table, uint32_t n)
296 {
297         unsigned int i;
298         const uint32_t size = r->size;
299         uint32_t idx = prod_head & r->mask;
300         uint64_t *ring = (uint64_t *)&r[1];
301         unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
302         if (likely(idx + n < size)) {
303                 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
304                         obj[i] = ring[idx];
305                         obj[i + 1] = ring[idx + 1];
306                         obj[i + 2] = ring[idx + 2];
307                         obj[i + 3] = ring[idx + 3];
308                 }
309                 switch (n & 0x3) {
310                 case 3:
311                         obj[i++] = ring[idx++]; /* fallthrough */
312                 case 2:
313                         obj[i++] = ring[idx++]; /* fallthrough */
314                 case 1:
315                         obj[i++] = ring[idx++]; /* fallthrough */
316                 }
317         } else {
318                 for (i = 0; idx < size; i++, idx++)
319                         obj[i] = ring[idx];
320                 /* Start at the beginning */
321                 for (idx = 0; i < n; i++, idx++)
322                         obj[i] = ring[idx];
323         }
324 }
325
326 static __rte_always_inline void
327 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
328                 void *obj_table, uint32_t n)
329 {
330         unsigned int i;
331         const uint32_t size = r->size;
332         uint32_t idx = prod_head & r->mask;
333         rte_int128_t *ring = (rte_int128_t *)&r[1];
334         rte_int128_t *obj = (rte_int128_t *)obj_table;
335         if (likely(idx + n < size)) {
336                 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
337                         memcpy((void *)(obj + i), (void *)(ring + idx), 32);
338                 switch (n & 0x1) {
339                 case 1:
340                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
341                 }
342         } else {
343                 for (i = 0; idx < size; i++, idx++)
344                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
345                 /* Start at the beginning */
346                 for (idx = 0; i < n; i++, idx++)
347                         memcpy((void *)(obj + i), (void *)(ring + idx), 16);
348         }
349 }
350
351 /* the actual dequeue of elements from the ring.
352  * Placed here since identical code needed in both
353  * single and multi producer enqueue functions.
354  */
355 static __rte_always_inline void
356 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
357                 void *obj_table, uint32_t esize, uint32_t num)
358 {
359         /* 8B and 16B copies implemented individually to retain
360          * the current performance.
361          */
362         if (esize == 8)
363                 __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
364         else if (esize == 16)
365                 __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
366         else {
367                 uint32_t idx, scale, nr_idx, nr_num, nr_size;
368
369                 /* Normalize to uint32_t */
370                 scale = esize / sizeof(uint32_t);
371                 nr_num = num * scale;
372                 idx = cons_head & r->mask;
373                 nr_idx = idx * scale;
374                 nr_size = r->size * scale;
375                 __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
376                                 obj_table, nr_num);
377         }
378 }
379
380 /* Between load and load. there might be cpu reorder in weak model
381  * (powerpc/arm).
382  * There are 2 choices for the users
383  * 1.use rmb() memory barrier
384  * 2.use one-direction load_acquire/store_release barrier,defined by
385  * CONFIG_RTE_USE_C11_MEM_MODEL=y
386  * It depends on performance test results.
387  * By default, move common functions to rte_ring_generic.h
388  */
389 #ifdef RTE_USE_C11_MEM_MODEL
390 #include "rte_ring_c11_mem.h"
391 #else
392 #include "rte_ring_generic.h"
393 #endif
394
395 /**
396  * @internal Enqueue several objects on the ring
397  *
398  * @param r
399  *   A pointer to the ring structure.
400  * @param obj_table
401  *   A pointer to a table of objects.
402  * @param esize
403  *   The size of ring element, in bytes. It must be a multiple of 4.
404  *   This must be the same value used while creating the ring. Otherwise
405  *   the results are undefined.
406  * @param n
407  *   The number of objects to add in the ring from the obj_table.
408  * @param behavior
409  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
410  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
411  * @param is_sp
412  *   Indicates whether to use single producer or multi-producer head update
413  * @param free_space
414  *   returns the amount of space after the enqueue operation has finished
415  * @return
416  *   Actual number of objects enqueued.
417  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
418  */
419 static __rte_always_inline unsigned int
420 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
421                 unsigned int esize, unsigned int n,
422                 enum rte_ring_queue_behavior behavior, unsigned int is_sp,
423                 unsigned int *free_space)
424 {
425         uint32_t prod_head, prod_next;
426         uint32_t free_entries;
427
428         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
429                         &prod_head, &prod_next, &free_entries);
430         if (n == 0)
431                 goto end;
432
433         __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
434
435         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
436 end:
437         if (free_space != NULL)
438                 *free_space = free_entries - n;
439         return n;
440 }
441
442 /**
443  * @internal Dequeue several objects from the ring
444  *
445  * @param r
446  *   A pointer to the ring structure.
447  * @param obj_table
448  *   A pointer to a table of objects.
449  * @param esize
450  *   The size of ring element, in bytes. It must be a multiple of 4.
451  *   This must be the same value used while creating the ring. Otherwise
452  *   the results are undefined.
453  * @param n
454  *   The number of objects to pull from the ring.
455  * @param behavior
456  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
457  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
458  * @param is_sc
459  *   Indicates whether to use single consumer or multi-consumer head update
460  * @param available
461  *   returns the number of remaining ring entries after the dequeue has finished
462  * @return
463  *   - Actual number of objects dequeued.
464  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
465  */
466 static __rte_always_inline unsigned int
467 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
468                 unsigned int esize, unsigned int n,
469                 enum rte_ring_queue_behavior behavior, unsigned int is_sc,
470                 unsigned int *available)
471 {
472         uint32_t cons_head, cons_next;
473         uint32_t entries;
474
475         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
476                         &cons_head, &cons_next, &entries);
477         if (n == 0)
478                 goto end;
479
480         __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
481
482         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
483
484 end:
485         if (available != NULL)
486                 *available = entries - n;
487         return n;
488 }
489
490 /**
491  * Enqueue several objects on the ring (multi-producers safe).
492  *
493  * This function uses a "compare and set" instruction to move the
494  * producer index atomically.
495  *
496  * @param r
497  *   A pointer to the ring structure.
498  * @param obj_table
499  *   A pointer to a table of objects.
500  * @param esize
501  *   The size of ring element, in bytes. It must be a multiple of 4.
502  *   This must be the same value used while creating the ring. Otherwise
503  *   the results are undefined.
504  * @param n
505  *   The number of objects to add in the ring from the obj_table.
506  * @param free_space
507  *   if non-NULL, returns the amount of space in the ring after the
508  *   enqueue operation has finished.
509  * @return
510  *   The number of objects enqueued, either 0 or n
511  */
512 static __rte_always_inline unsigned int
513 rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
514                 unsigned int esize, unsigned int n, unsigned int *free_space)
515 {
516         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
517                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space);
518 }
519
520 /**
521  * Enqueue several objects on a ring
522  *
523  * @warning This API is NOT multi-producers safe
524  *
525  * @param r
526  *   A pointer to the ring structure.
527  * @param obj_table
528  *   A pointer to a table of objects.
529  * @param esize
530  *   The size of ring element, in bytes. It must be a multiple of 4.
531  *   This must be the same value used while creating the ring. Otherwise
532  *   the results are undefined.
533  * @param n
534  *   The number of objects to add in the ring from the obj_table.
535  * @param free_space
536  *   if non-NULL, returns the amount of space in the ring after the
537  *   enqueue operation has finished.
538  * @return
539  *   The number of objects enqueued, either 0 or n
540  */
541 static __rte_always_inline unsigned int
542 rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
543                 unsigned int esize, unsigned int n, unsigned int *free_space)
544 {
545         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
546                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space);
547 }
548
549 #ifdef ALLOW_EXPERIMENTAL_API
550 #include <rte_ring_hts.h>
551 #include <rte_ring_rts.h>
552 #endif
553
554 /**
555  * Enqueue several objects on a ring.
556  *
557  * This function calls the multi-producer or the single-producer
558  * version depending on the default behavior that was specified at
559  * ring creation time (see flags).
560  *
561  * @param r
562  *   A pointer to the ring structure.
563  * @param obj_table
564  *   A pointer to a table of objects.
565  * @param esize
566  *   The size of ring element, in bytes. It must be a multiple of 4.
567  *   This must be the same value used while creating the ring. Otherwise
568  *   the results are undefined.
569  * @param n
570  *   The number of objects to add in the ring from the obj_table.
571  * @param free_space
572  *   if non-NULL, returns the amount of space in the ring after the
573  *   enqueue operation has finished.
574  * @return
575  *   The number of objects enqueued, either 0 or n
576  */
577 static __rte_always_inline unsigned int
578 rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
579                 unsigned int esize, unsigned int n, unsigned int *free_space)
580 {
581         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
582                         RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
583
584         switch (r->prod.sync_type) {
585         case RTE_RING_SYNC_MT:
586                 return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n,
587                         free_space);
588         case RTE_RING_SYNC_ST:
589                 return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n,
590                         free_space);
591 #ifdef ALLOW_EXPERIMENTAL_API
592         case RTE_RING_SYNC_MT_RTS:
593                 return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n,
594                         free_space);
595         case RTE_RING_SYNC_MT_HTS:
596                 return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n,
597                         free_space);
598 #endif
599         }
600
601         /* valid ring should never reach this point */
602         RTE_ASSERT(0);
603         if (free_space != NULL)
604                 *free_space = 0;
605         return 0;
606 }
607
608 /**
609  * Enqueue one object on a ring (multi-producers safe).
610  *
611  * This function uses a "compare and set" instruction to move the
612  * producer index atomically.
613  *
614  * @param r
615  *   A pointer to the ring structure.
616  * @param obj
617  *   A pointer to the object to be added.
618  * @param esize
619  *   The size of ring element, in bytes. It must be a multiple of 4.
620  *   This must be the same value used while creating the ring. Otherwise
621  *   the results are undefined.
622  * @return
623  *   - 0: Success; objects enqueued.
624  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
625  */
626 static __rte_always_inline int
627 rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
628 {
629         return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
630                                                                 -ENOBUFS;
631 }
632
633 /**
634  * Enqueue one object on a ring
635  *
636  * @warning This API is NOT multi-producers safe
637  *
638  * @param r
639  *   A pointer to the ring structure.
640  * @param obj
641  *   A pointer to the object to be added.
642  * @param esize
643  *   The size of ring element, in bytes. It must be a multiple of 4.
644  *   This must be the same value used while creating the ring. Otherwise
645  *   the results are undefined.
646  * @return
647  *   - 0: Success; objects enqueued.
648  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
649  */
650 static __rte_always_inline int
651 rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
652 {
653         return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
654                                                                 -ENOBUFS;
655 }
656
657 /**
658  * Enqueue one object on a ring.
659  *
660  * This function calls the multi-producer or the single-producer
661  * version, depending on the default behaviour that was specified at
662  * ring creation time (see flags).
663  *
664  * @param r
665  *   A pointer to the ring structure.
666  * @param obj
667  *   A pointer to the object to be added.
668  * @param esize
669  *   The size of ring element, in bytes. It must be a multiple of 4.
670  *   This must be the same value used while creating the ring. Otherwise
671  *   the results are undefined.
672  * @return
673  *   - 0: Success; objects enqueued.
674  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
675  */
676 static __rte_always_inline int
677 rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
678 {
679         return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
680                                                                 -ENOBUFS;
681 }
682
683 /**
684  * Dequeue several objects from a ring (multi-consumers safe).
685  *
686  * This function uses a "compare and set" instruction to move the
687  * consumer index atomically.
688  *
689  * @param r
690  *   A pointer to the ring structure.
691  * @param obj_table
692  *   A pointer to a table of objects that will be filled.
693  * @param esize
694  *   The size of ring element, in bytes. It must be a multiple of 4.
695  *   This must be the same value used while creating the ring. Otherwise
696  *   the results are undefined.
697  * @param n
698  *   The number of objects to dequeue from the ring to the obj_table.
699  * @param available
700  *   If non-NULL, returns the number of remaining ring entries after the
701  *   dequeue has finished.
702  * @return
703  *   The number of objects dequeued, either 0 or n
704  */
705 static __rte_always_inline unsigned int
706 rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
707                 unsigned int esize, unsigned int n, unsigned int *available)
708 {
709         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
710                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available);
711 }
712
713 /**
714  * Dequeue several objects from a ring (NOT multi-consumers safe).
715  *
716  * @param r
717  *   A pointer to the ring structure.
718  * @param obj_table
719  *   A pointer to a table of objects that will be filled.
720  * @param esize
721  *   The size of ring element, in bytes. It must be a multiple of 4.
722  *   This must be the same value used while creating the ring. Otherwise
723  *   the results are undefined.
724  * @param n
725  *   The number of objects to dequeue from the ring to the obj_table,
726  *   must be strictly positive.
727  * @param available
728  *   If non-NULL, returns the number of remaining ring entries after the
729  *   dequeue has finished.
730  * @return
731  *   The number of objects dequeued, either 0 or n
732  */
733 static __rte_always_inline unsigned int
734 rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
735                 unsigned int esize, unsigned int n, unsigned int *available)
736 {
737         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
738                         RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available);
739 }
740
741 /**
742  * Dequeue several objects from a ring.
743  *
744  * This function calls the multi-consumers or the single-consumer
745  * version, depending on the default behaviour that was specified at
746  * ring creation time (see flags).
747  *
748  * @param r
749  *   A pointer to the ring structure.
750  * @param obj_table
751  *   A pointer to a table of objects that will be filled.
752  * @param esize
753  *   The size of ring element, in bytes. It must be a multiple of 4.
754  *   This must be the same value used while creating the ring. Otherwise
755  *   the results are undefined.
756  * @param n
757  *   The number of objects to dequeue from the ring to the obj_table.
758  * @param available
759  *   If non-NULL, returns the number of remaining ring entries after the
760  *   dequeue has finished.
761  * @return
762  *   The number of objects dequeued, either 0 or n
763  */
764 static __rte_always_inline unsigned int
765 rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
766                 unsigned int esize, unsigned int n, unsigned int *available)
767 {
768         switch (r->cons.sync_type) {
769         case RTE_RING_SYNC_MT:
770                 return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n,
771                         available);
772         case RTE_RING_SYNC_ST:
773                 return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n,
774                         available);
775 #ifdef ALLOW_EXPERIMENTAL_API
776         case RTE_RING_SYNC_MT_RTS:
777                 return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize,
778                         n, available);
779         case RTE_RING_SYNC_MT_HTS:
780                 return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize,
781                         n, available);
782 #endif
783         }
784
785         /* valid ring should never reach this point */
786         RTE_ASSERT(0);
787         if (available != NULL)
788                 *available = 0;
789         return 0;
790 }
791
792 /**
793  * Dequeue one object from a ring (multi-consumers safe).
794  *
795  * This function uses a "compare and set" instruction to move the
796  * consumer index atomically.
797  *
798  * @param r
799  *   A pointer to the ring structure.
800  * @param obj_p
801  *   A pointer to the object that will be filled.
802  * @param esize
803  *   The size of ring element, in bytes. It must be a multiple of 4.
804  *   This must be the same value used while creating the ring. Otherwise
805  *   the results are undefined.
806  * @return
807  *   - 0: Success; objects dequeued.
808  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
809  *     dequeued.
810  */
811 static __rte_always_inline int
812 rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
813                                 unsigned int esize)
814 {
815         return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
816                                                                 -ENOENT;
817 }
818
819 /**
820  * Dequeue one object from a ring (NOT multi-consumers safe).
821  *
822  * @param r
823  *   A pointer to the ring structure.
824  * @param obj_p
825  *   A pointer to the object that will be filled.
826  * @param esize
827  *   The size of ring element, in bytes. It must be a multiple of 4.
828  *   This must be the same value used while creating the ring. Otherwise
829  *   the results are undefined.
830  * @return
831  *   - 0: Success; objects dequeued.
832  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
833  *     dequeued.
834  */
835 static __rte_always_inline int
836 rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
837                                 unsigned int esize)
838 {
839         return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
840                                                                 -ENOENT;
841 }
842
843 /**
844  * Dequeue one object from a ring.
845  *
846  * This function calls the multi-consumers or the single-consumer
847  * version depending on the default behaviour that was specified at
848  * ring creation time (see flags).
849  *
850  * @param r
851  *   A pointer to the ring structure.
852  * @param obj_p
853  *   A pointer to the object that will be filled.
854  * @param esize
855  *   The size of ring element, in bytes. It must be a multiple of 4.
856  *   This must be the same value used while creating the ring. Otherwise
857  *   the results are undefined.
858  * @return
859  *   - 0: Success, objects dequeued.
860  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
861  *     dequeued.
862  */
863 static __rte_always_inline int
864 rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
865 {
866         return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
867                                                                 -ENOENT;
868 }
869
870 /**
871  * Enqueue several objects on the ring (multi-producers safe).
872  *
873  * This function uses a "compare and set" instruction to move the
874  * producer index atomically.
875  *
876  * @param r
877  *   A pointer to the ring structure.
878  * @param obj_table
879  *   A pointer to a table of objects.
880  * @param esize
881  *   The size of ring element, in bytes. It must be a multiple of 4.
882  *   This must be the same value used while creating the ring. Otherwise
883  *   the results are undefined.
884  * @param n
885  *   The number of objects to add in the ring from the obj_table.
886  * @param free_space
887  *   if non-NULL, returns the amount of space in the ring after the
888  *   enqueue operation has finished.
889  * @return
890  *   - n: Actual number of objects enqueued.
891  */
892 static __rte_always_inline unsigned
893 rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
894                 unsigned int esize, unsigned int n, unsigned int *free_space)
895 {
896         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
897                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
898 }
899
900 /**
901  * Enqueue several objects on a ring
902  *
903  * @warning This API is NOT multi-producers safe
904  *
905  * @param r
906  *   A pointer to the ring structure.
907  * @param obj_table
908  *   A pointer to a table of objects.
909  * @param esize
910  *   The size of ring element, in bytes. It must be a multiple of 4.
911  *   This must be the same value used while creating the ring. Otherwise
912  *   the results are undefined.
913  * @param n
914  *   The number of objects to add in the ring from the obj_table.
915  * @param free_space
916  *   if non-NULL, returns the amount of space in the ring after the
917  *   enqueue operation has finished.
918  * @return
919  *   - n: Actual number of objects enqueued.
920  */
921 static __rte_always_inline unsigned
922 rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
923                 unsigned int esize, unsigned int n, unsigned int *free_space)
924 {
925         return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
926                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
927 }
928
929 /**
930  * Enqueue several objects on a ring.
931  *
932  * This function calls the multi-producer or the single-producer
933  * version depending on the default behavior that was specified at
934  * ring creation time (see flags).
935  *
936  * @param r
937  *   A pointer to the ring structure.
938  * @param obj_table
939  *   A pointer to a table of objects.
940  * @param esize
941  *   The size of ring element, in bytes. It must be a multiple of 4.
942  *   This must be the same value used while creating the ring. Otherwise
943  *   the results are undefined.
944  * @param n
945  *   The number of objects to add in the ring from the obj_table.
946  * @param free_space
947  *   if non-NULL, returns the amount of space in the ring after the
948  *   enqueue operation has finished.
949  * @return
950  *   - n: Actual number of objects enqueued.
951  */
952 static __rte_always_inline unsigned
953 rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
954                 unsigned int esize, unsigned int n, unsigned int *free_space)
955 {
956         switch (r->prod.sync_type) {
957         case RTE_RING_SYNC_MT:
958                 return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n,
959                         free_space);
960         case RTE_RING_SYNC_ST:
961                 return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n,
962                         free_space);
963 #ifdef ALLOW_EXPERIMENTAL_API
964         case RTE_RING_SYNC_MT_RTS:
965                 return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize,
966                         n, free_space);
967         case RTE_RING_SYNC_MT_HTS:
968                 return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize,
969                         n, free_space);
970 #endif
971         }
972
973         /* valid ring should never reach this point */
974         RTE_ASSERT(0);
975         if (free_space != NULL)
976                 *free_space = 0;
977         return 0;
978 }
979
980 /**
981  * Dequeue several objects from a ring (multi-consumers safe). When the request
982  * objects are more than the available objects, only dequeue the actual number
983  * of objects
984  *
985  * This function uses a "compare and set" instruction to move the
986  * consumer index atomically.
987  *
988  * @param r
989  *   A pointer to the ring structure.
990  * @param obj_table
991  *   A pointer to a table of objects that will be filled.
992  * @param esize
993  *   The size of ring element, in bytes. It must be a multiple of 4.
994  *   This must be the same value used while creating the ring. Otherwise
995  *   the results are undefined.
996  * @param n
997  *   The number of objects to dequeue from the ring to the obj_table.
998  * @param available
999  *   If non-NULL, returns the number of remaining ring entries after the
1000  *   dequeue has finished.
1001  * @return
1002  *   - n: Actual number of objects dequeued, 0 if ring is empty
1003  */
1004 static __rte_always_inline unsigned
1005 rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
1006                 unsigned int esize, unsigned int n, unsigned int *available)
1007 {
1008         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
1009                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
1010 }
1011
1012 /**
1013  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
1014  * request objects are more than the available objects, only dequeue the
1015  * actual number of objects
1016  *
1017  * @param r
1018  *   A pointer to the ring structure.
1019  * @param obj_table
1020  *   A pointer to a table of objects that will be filled.
1021  * @param esize
1022  *   The size of ring element, in bytes. It must be a multiple of 4.
1023  *   This must be the same value used while creating the ring. Otherwise
1024  *   the results are undefined.
1025  * @param n
1026  *   The number of objects to dequeue from the ring to the obj_table.
1027  * @param available
1028  *   If non-NULL, returns the number of remaining ring entries after the
1029  *   dequeue has finished.
1030  * @return
1031  *   - n: Actual number of objects dequeued, 0 if ring is empty
1032  */
1033 static __rte_always_inline unsigned
1034 rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
1035                 unsigned int esize, unsigned int n, unsigned int *available)
1036 {
1037         return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
1038                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
1039 }
1040
1041 /**
1042  * Dequeue multiple objects from a ring up to a maximum number.
1043  *
1044  * This function calls the multi-consumers or the single-consumer
1045  * version, depending on the default behaviour that was specified at
1046  * ring creation time (see flags).
1047  *
1048  * @param r
1049  *   A pointer to the ring structure.
1050  * @param obj_table
1051  *   A pointer to a table of objects that will be filled.
1052  * @param esize
1053  *   The size of ring element, in bytes. It must be a multiple of 4.
1054  *   This must be the same value used while creating the ring. Otherwise
1055  *   the results are undefined.
1056  * @param n
1057  *   The number of objects to dequeue from the ring to the obj_table.
1058  * @param available
1059  *   If non-NULL, returns the number of remaining ring entries after the
1060  *   dequeue has finished.
1061  * @return
1062  *   - Number of objects dequeued
1063  */
1064 static __rte_always_inline unsigned int
1065 rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
1066                 unsigned int esize, unsigned int n, unsigned int *available)
1067 {
1068         switch (r->cons.sync_type) {
1069         case RTE_RING_SYNC_MT:
1070                 return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n,
1071                         available);
1072         case RTE_RING_SYNC_ST:
1073                 return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n,
1074                         available);
1075 #ifdef ALLOW_EXPERIMENTAL_API
1076         case RTE_RING_SYNC_MT_RTS:
1077                 return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize,
1078                         n, available);
1079         case RTE_RING_SYNC_MT_HTS:
1080                 return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize,
1081                         n, available);
1082 #endif
1083         }
1084
1085         /* valid ring should never reach this point */
1086         RTE_ASSERT(0);
1087         if (available != NULL)
1088                 *available = 0;
1089         return 0;
1090 }
1091
1092 #include <rte_ring.h>
1093
1094 #ifdef __cplusplus
1095 }
1096 #endif
1097
1098 #endif /* _RTE_RING_ELEM_H_ */