1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2010-2020 Intel Corporation
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
17 * The Ring Manager is a fixed-size queue, implemented as a table of
18 * pointers. Head and tail pointers are modified atomically, allowing
19 * concurrent access to it. It has the following features:
21 * - FIFO (First In First Out)
22 * - Maximum size is fixed; the pointers are stored in a table.
23 * - Lockless implementation.
24 * - Multi- or single-consumer dequeue.
25 * - Multi- or single-producer enqueue.
28 * - Ability to select different sync modes for producer/consumer.
29 * - Dequeue start/finish (depending on consumer sync modes).
30 * - Enqueue start/finish (depending on producer sync mode).
32 * Note: the ring implementation is not preemptible. Refer to Programmer's
33 * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
34 * for more information.
42 #include <rte_ring_core.h>
43 #include <rte_ring_elem.h>
46 * Calculate the memory size needed for a ring
48 * This function returns the number of bytes needed for a ring, given
49 * the number of elements in it. This value is the sum of the size of
50 * the structure rte_ring and the size of the memory needed by the
51 * objects pointers. The value is aligned to a cache line size.
54 * The number of elements in the ring (must be a power of 2).
56 * - The memory size needed for the ring on success.
57 * - -EINVAL if count is not a power of 2.
59 ssize_t rte_ring_get_memsize(unsigned int count);
62 * Initialize a ring structure.
64 * Initialize a ring structure in memory pointed by "r". The size of the
65 * memory area must be large enough to store the ring structure and the
66 * object table. It is advised to use rte_ring_get_memsize() to get the
69 * The ring size is set to *count*, which must be a power of two. Water
70 * marking is disabled by default. The real usable ring size is
71 * *count-1* instead of *count* to differentiate a full ring from an
74 * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
75 * memory given by the caller may not be shareable among dpdk
79 * The pointer to the ring structure followed by the objects table.
81 * The name of the ring.
83 * The number of elements in the ring (must be a power of 2,
84 * unless RING_F_EXACT_SZ is set in flags).
86 * An OR of the following:
87 * - One of mutually exclusive flags that define producer behavior:
88 * - RING_F_SP_ENQ: If this flag is set, the default behavior when
89 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
90 * is "single-producer".
91 * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
92 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
93 * is "multi-producer RTS mode".
94 * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
95 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
96 * is "multi-producer HTS mode".
97 * If none of these flags is set, then default "multi-producer"
98 * behavior is selected.
99 * - One of mutually exclusive flags that define consumer behavior:
100 * - RING_F_SC_DEQ: If this flag is set, the default behavior when
101 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
102 * is "single-consumer". Otherwise, it is "multi-consumers".
103 * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
104 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
105 * is "multi-consumer RTS mode".
106 * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
107 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
108 * is "multi-consumer HTS mode".
109 * If none of these flags is set, then default "multi-consumer"
110 * behavior is selected.
111 * - RING_F_EXACT_SZ: If this flag is set, the ring will hold exactly the
112 * requested number of entries, and the requested size will be rounded up
113 * to the next power of two, but the usable space will be exactly that
114 * requested. Worst case, if a power-of-2 size is requested, half the
115 * ring space will be wasted.
116 * Without this flag set, the ring size requested must be a power of 2,
117 * and the usable space will be that size - 1.
119 * 0 on success, or a negative value on error.
121 int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
125 * Create a new ring named *name* in memory.
127 * This function uses ``memzone_reserve()`` to allocate memory. Then it
128 * calls rte_ring_init() to initialize an empty ring.
130 * The new ring size is set to *count*, which must be a power of
131 * two. Water marking is disabled by default. The real usable ring size
132 * is *count-1* instead of *count* to differentiate a full ring from an
135 * The ring is added in RTE_TAILQ_RING list.
138 * The name of the ring.
140 * The size of the ring (must be a power of 2,
141 * unless RING_F_EXACT_SZ is set in flags).
143 * The *socket_id* argument is the socket identifier in case of
144 * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
145 * constraint for the reserved zone.
147 * An OR of the following:
148 * - One of mutually exclusive flags that define producer behavior:
149 * - RING_F_SP_ENQ: If this flag is set, the default behavior when
150 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
151 * is "single-producer".
152 * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
153 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
154 * is "multi-producer RTS mode".
155 * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
156 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
157 * is "multi-producer HTS mode".
158 * If none of these flags is set, then default "multi-producer"
159 * behavior is selected.
160 * - One of mutually exclusive flags that define consumer behavior:
161 * - RING_F_SC_DEQ: If this flag is set, the default behavior when
162 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
163 * is "single-consumer". Otherwise, it is "multi-consumers".
164 * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
165 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
166 * is "multi-consumer RTS mode".
167 * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
168 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
169 * is "multi-consumer HTS mode".
170 * If none of these flags is set, then default "multi-consumer"
171 * behavior is selected.
172 * - RING_F_EXACT_SZ: If this flag is set, the ring will hold exactly the
173 * requested number of entries, and the requested size will be rounded up
174 * to the next power of two, but the usable space will be exactly that
175 * requested. Worst case, if a power-of-2 size is requested, half the
176 * ring space will be wasted.
177 * Without this flag set, the ring size requested must be a power of 2,
178 * and the usable space will be that size - 1.
180 * On success, the pointer to the new allocated ring. NULL on error with
181 * rte_errno set appropriately. Possible errno values include:
182 * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
183 * - E_RTE_SECONDARY - function was called from a secondary process instance
184 * - EINVAL - count provided is not a power of 2
185 * - ENOSPC - the maximum number of memzones has already been allocated
186 * - EEXIST - a memzone with the same name already exists
187 * - ENOMEM - no appropriate memory area found in which to create memzone
189 struct rte_ring *rte_ring_create(const char *name, unsigned int count,
190 int socket_id, unsigned int flags);
193 * De-allocate all memory used by the ring.
197 * If NULL then, the function does nothing.
199 void rte_ring_free(struct rte_ring *r);
202 * Dump the status of the ring to a file.
205 * A pointer to a file for output
207 * A pointer to the ring structure.
209 void rte_ring_dump(FILE *f, const struct rte_ring *r);
212 * Enqueue several objects on the ring (multi-producers safe).
214 * This function uses a "compare and set" instruction to move the
215 * producer index atomically.
218 * A pointer to the ring structure.
220 * A pointer to a table of void * pointers (objects).
222 * The number of objects to add in the ring from the obj_table.
224 * if non-NULL, returns the amount of space in the ring after the
225 * enqueue operation has finished.
227 * The number of objects enqueued, either 0 or n
229 static __rte_always_inline unsigned int
230 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
231 unsigned int n, unsigned int *free_space)
233 return rte_ring_mp_enqueue_bulk_elem(r, obj_table, sizeof(void *),
238 * Enqueue several objects on a ring (NOT multi-producers safe).
241 * A pointer to the ring structure.
243 * A pointer to a table of void * pointers (objects).
245 * The number of objects to add in the ring from the obj_table.
247 * if non-NULL, returns the amount of space in the ring after the
248 * enqueue operation has finished.
250 * The number of objects enqueued, either 0 or n
252 static __rte_always_inline unsigned int
253 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
254 unsigned int n, unsigned int *free_space)
256 return rte_ring_sp_enqueue_bulk_elem(r, obj_table, sizeof(void *),
261 * Enqueue several objects on a ring.
263 * This function calls the multi-producer or the single-producer
264 * version depending on the default behavior that was specified at
265 * ring creation time (see flags).
268 * A pointer to the ring structure.
270 * A pointer to a table of void * pointers (objects).
272 * The number of objects to add in the ring from the obj_table.
274 * if non-NULL, returns the amount of space in the ring after the
275 * enqueue operation has finished.
277 * The number of objects enqueued, either 0 or n
279 static __rte_always_inline unsigned int
280 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
281 unsigned int n, unsigned int *free_space)
283 return rte_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *),
288 * Enqueue one object on a ring (multi-producers safe).
290 * This function uses a "compare and set" instruction to move the
291 * producer index atomically.
294 * A pointer to the ring structure.
296 * A pointer to the object to be added.
298 * - 0: Success; objects enqueued.
299 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
301 static __rte_always_inline int
302 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
304 return rte_ring_mp_enqueue_elem(r, &obj, sizeof(void *));
308 * Enqueue one object on a ring (NOT multi-producers safe).
311 * A pointer to the ring structure.
313 * A pointer to the object to be added.
315 * - 0: Success; objects enqueued.
316 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
318 static __rte_always_inline int
319 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
321 return rte_ring_sp_enqueue_elem(r, &obj, sizeof(void *));
325 * Enqueue one object on a ring.
327 * This function calls the multi-producer or the single-producer
328 * version, depending on the default behaviour that was specified at
329 * ring creation time (see flags).
332 * A pointer to the ring structure.
334 * A pointer to the object to be added.
336 * - 0: Success; objects enqueued.
337 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
339 static __rte_always_inline int
340 rte_ring_enqueue(struct rte_ring *r, void *obj)
342 return rte_ring_enqueue_elem(r, &obj, sizeof(void *));
346 * Dequeue several objects from a ring (multi-consumers safe).
348 * This function uses a "compare and set" instruction to move the
349 * consumer index atomically.
352 * A pointer to the ring structure.
354 * A pointer to a table of void * pointers (objects) that will be filled.
356 * The number of objects to dequeue from the ring to the obj_table.
358 * If non-NULL, returns the number of remaining ring entries after the
359 * dequeue has finished.
361 * The number of objects dequeued, either 0 or n
363 static __rte_always_inline unsigned int
364 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
365 unsigned int n, unsigned int *available)
367 return rte_ring_mc_dequeue_bulk_elem(r, obj_table, sizeof(void *),
372 * Dequeue several objects from a ring (NOT multi-consumers safe).
375 * A pointer to the ring structure.
377 * A pointer to a table of void * pointers (objects) that will be filled.
379 * The number of objects to dequeue from the ring to the obj_table,
380 * must be strictly positive.
382 * If non-NULL, returns the number of remaining ring entries after the
383 * dequeue has finished.
385 * The number of objects dequeued, either 0 or n
387 static __rte_always_inline unsigned int
388 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
389 unsigned int n, unsigned int *available)
391 return rte_ring_sc_dequeue_bulk_elem(r, obj_table, sizeof(void *),
396 * Dequeue several objects from a ring.
398 * This function calls the multi-consumers or the single-consumer
399 * version, depending on the default behaviour that was specified at
400 * ring creation time (see flags).
403 * A pointer to the ring structure.
405 * A pointer to a table of void * pointers (objects) that will be filled.
407 * The number of objects to dequeue from the ring to the obj_table.
409 * If non-NULL, returns the number of remaining ring entries after the
410 * dequeue has finished.
412 * The number of objects dequeued, either 0 or n
414 static __rte_always_inline unsigned int
415 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
416 unsigned int *available)
418 return rte_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
423 * Dequeue one object from a ring (multi-consumers safe).
425 * This function uses a "compare and set" instruction to move the
426 * consumer index atomically.
429 * A pointer to the ring structure.
431 * A pointer to a void * pointer (object) that will be filled.
433 * - 0: Success; objects dequeued.
434 * - -ENOENT: Not enough entries in the ring to dequeue; no object is
437 static __rte_always_inline int
438 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
440 return rte_ring_mc_dequeue_elem(r, obj_p, sizeof(void *));
444 * Dequeue one object from a ring (NOT multi-consumers safe).
447 * A pointer to the ring structure.
449 * A pointer to a void * pointer (object) that will be filled.
451 * - 0: Success; objects dequeued.
452 * - -ENOENT: Not enough entries in the ring to dequeue, no object is
455 static __rte_always_inline int
456 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
458 return rte_ring_sc_dequeue_elem(r, obj_p, sizeof(void *));
462 * Dequeue one object from a ring.
464 * This function calls the multi-consumers or the single-consumer
465 * version depending on the default behaviour that was specified at
466 * ring creation time (see flags).
469 * A pointer to the ring structure.
471 * A pointer to a void * pointer (object) that will be filled.
473 * - 0: Success, objects dequeued.
474 * - -ENOENT: Not enough entries in the ring to dequeue, no object is
477 static __rte_always_inline int
478 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
480 return rte_ring_dequeue_elem(r, obj_p, sizeof(void *));
486 * This function flush all the elements in a ring
489 * Make sure the ring is not in use while calling this function.
492 * A pointer to the ring structure.
495 rte_ring_reset(struct rte_ring *r);
498 * Return the number of entries in a ring.
501 * A pointer to the ring structure.
503 * The number of entries in the ring.
505 static inline unsigned int
506 rte_ring_count(const struct rte_ring *r)
508 uint32_t prod_tail = r->prod.tail;
509 uint32_t cons_tail = r->cons.tail;
510 uint32_t count = (prod_tail - cons_tail) & r->mask;
511 return (count > r->capacity) ? r->capacity : count;
515 * Return the number of free entries in a ring.
518 * A pointer to the ring structure.
520 * The number of free entries in the ring.
522 static inline unsigned int
523 rte_ring_free_count(const struct rte_ring *r)
525 return r->capacity - rte_ring_count(r);
529 * Test if a ring is full.
532 * A pointer to the ring structure.
534 * - 1: The ring is full.
535 * - 0: The ring is not full.
538 rte_ring_full(const struct rte_ring *r)
540 return rte_ring_free_count(r) == 0;
544 * Test if a ring is empty.
547 * A pointer to the ring structure.
549 * - 1: The ring is empty.
550 * - 0: The ring is not empty.
553 rte_ring_empty(const struct rte_ring *r)
555 uint32_t prod_tail = r->prod.tail;
556 uint32_t cons_tail = r->cons.tail;
557 return cons_tail == prod_tail;
561 * Return the size of the ring.
564 * A pointer to the ring structure.
566 * The size of the data store used by the ring.
567 * NOTE: this is not the same as the usable space in the ring. To query that
568 * use ``rte_ring_get_capacity()``.
570 static inline unsigned int
571 rte_ring_get_size(const struct rte_ring *r)
577 * Return the number of elements which can be stored in the ring.
580 * A pointer to the ring structure.
582 * The usable size of the ring.
584 static inline unsigned int
585 rte_ring_get_capacity(const struct rte_ring *r)
591 * Return sync type used by producer in the ring.
594 * A pointer to the ring structure.
596 * Producer sync type value.
598 static inline enum rte_ring_sync_type
599 rte_ring_get_prod_sync_type(const struct rte_ring *r)
601 return r->prod.sync_type;
605 * Check is the ring for single producer.
608 * A pointer to the ring structure.
610 * true if ring is SP, zero otherwise.
613 rte_ring_is_prod_single(const struct rte_ring *r)
615 return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
619 * Return sync type used by consumer in the ring.
622 * A pointer to the ring structure.
624 * Consumer sync type value.
626 static inline enum rte_ring_sync_type
627 rte_ring_get_cons_sync_type(const struct rte_ring *r)
629 return r->cons.sync_type;
633 * Check is the ring for single consumer.
636 * A pointer to the ring structure.
638 * true if ring is SC, zero otherwise.
641 rte_ring_is_cons_single(const struct rte_ring *r)
643 return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
647 * Dump the status of all rings on the console
650 * A pointer to a file for output
652 void rte_ring_list_dump(FILE *f);
655 * Search a ring from its name
658 * The name of the ring.
660 * The pointer to the ring matching the name, or NULL if not found,
661 * with rte_errno set appropriately. Possible rte_errno values include:
662 * - ENOENT - required entry not available to return.
664 struct rte_ring *rte_ring_lookup(const char *name);
667 * Enqueue several objects on the ring (multi-producers safe).
669 * This function uses a "compare and set" instruction to move the
670 * producer index atomically.
673 * A pointer to the ring structure.
675 * A pointer to a table of void * pointers (objects).
677 * The number of objects to add in the ring from the obj_table.
679 * if non-NULL, returns the amount of space in the ring after the
680 * enqueue operation has finished.
682 * - n: Actual number of objects enqueued.
684 static __rte_always_inline unsigned int
685 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
686 unsigned int n, unsigned int *free_space)
688 return rte_ring_mp_enqueue_burst_elem(r, obj_table, sizeof(void *),
693 * Enqueue several objects on a ring (NOT multi-producers safe).
696 * A pointer to the ring structure.
698 * A pointer to a table of void * pointers (objects).
700 * The number of objects to add in the ring from the obj_table.
702 * if non-NULL, returns the amount of space in the ring after the
703 * enqueue operation has finished.
705 * - n: Actual number of objects enqueued.
707 static __rte_always_inline unsigned int
708 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
709 unsigned int n, unsigned int *free_space)
711 return rte_ring_sp_enqueue_burst_elem(r, obj_table, sizeof(void *),
716 * Enqueue several objects on a ring.
718 * This function calls the multi-producer or the single-producer
719 * version depending on the default behavior that was specified at
720 * ring creation time (see flags).
723 * A pointer to the ring structure.
725 * A pointer to a table of void * pointers (objects).
727 * The number of objects to add in the ring from the obj_table.
729 * if non-NULL, returns the amount of space in the ring after the
730 * enqueue operation has finished.
732 * - n: Actual number of objects enqueued.
734 static __rte_always_inline unsigned int
735 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
736 unsigned int n, unsigned int *free_space)
738 return rte_ring_enqueue_burst_elem(r, obj_table, sizeof(void *),
743 * Dequeue several objects from a ring (multi-consumers safe). When the request
744 * objects are more than the available objects, only dequeue the actual number
747 * This function uses a "compare and set" instruction to move the
748 * consumer index atomically.
751 * A pointer to the ring structure.
753 * A pointer to a table of void * pointers (objects) that will be filled.
755 * The number of objects to dequeue from the ring to the obj_table.
757 * If non-NULL, returns the number of remaining ring entries after the
758 * dequeue has finished.
760 * - n: Actual number of objects dequeued, 0 if ring is empty
762 static __rte_always_inline unsigned int
763 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
764 unsigned int n, unsigned int *available)
766 return rte_ring_mc_dequeue_burst_elem(r, obj_table, sizeof(void *),
771 * Dequeue several objects from a ring (NOT multi-consumers safe).When the
772 * request objects are more than the available objects, only dequeue the
773 * actual number of objects
776 * A pointer to the ring structure.
778 * A pointer to a table of void * pointers (objects) that will be filled.
780 * The number of objects to dequeue from the ring to the obj_table.
782 * If non-NULL, returns the number of remaining ring entries after the
783 * dequeue has finished.
785 * - n: Actual number of objects dequeued, 0 if ring is empty
787 static __rte_always_inline unsigned int
788 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
789 unsigned int n, unsigned int *available)
791 return rte_ring_sc_dequeue_burst_elem(r, obj_table, sizeof(void *),
796 * Dequeue multiple objects from a ring up to a maximum number.
798 * This function calls the multi-consumers or the single-consumer
799 * version, depending on the default behaviour that was specified at
800 * ring creation time (see flags).
803 * A pointer to the ring structure.
805 * A pointer to a table of void * pointers (objects) that will be filled.
807 * The number of objects to dequeue from the ring to the obj_table.
809 * If non-NULL, returns the number of remaining ring entries after the
810 * dequeue has finished.
812 * - Number of objects dequeued
814 static __rte_always_inline unsigned int
815 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
816 unsigned int n, unsigned int *available)
818 return rte_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
826 #endif /* _RTE_RING_H_ */