1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2010-2020 Intel Corporation
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
17 * The Ring Manager is a fixed-size queue, implemented as a table of
18 * pointers. Head and tail pointers are modified atomically, allowing
19 * concurrent access to it. It has the following features:
21 * - FIFO (First In First Out)
22 * - Maximum size is fixed; the pointers are stored in a table.
23 * - Lockless implementation.
24 * - Multi- or single-consumer dequeue.
25 * - Multi- or single-producer enqueue.
28 * - Ability to select different sync modes for producer/consumer.
29 * - Dequeue start/finish (depending on consumer sync modes).
30 * - Enqueue start/finish (depending on producer sync mode).
32 * Note: the ring implementation is not preemptible. Refer to Programmer's
33 * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
34 * for more information.
42 #include <rte_ring_core.h>
43 #include <rte_ring_elem.h>
46 * Calculate the memory size needed for a ring
48 * This function returns the number of bytes needed for a ring, given
49 * the number of elements in it. This value is the sum of the size of
50 * the structure rte_ring and the size of the memory needed by the
51 * objects pointers. The value is aligned to a cache line size.
54 * The number of elements in the ring (must be a power of 2).
56 * - The memory size needed for the ring on success.
57 * - -EINVAL if count is not a power of 2.
59 ssize_t rte_ring_get_memsize(unsigned int count);
62 * Initialize a ring structure.
64 * Initialize a ring structure in memory pointed by "r". The size of the
65 * memory area must be large enough to store the ring structure and the
66 * object table. It is advised to use rte_ring_get_memsize() to get the
69 * The ring size is set to *count*, which must be a power of two. Water
70 * marking is disabled by default. The real usable ring size is
71 * *count-1* instead of *count* to differentiate a full ring from an
74 * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
75 * memory given by the caller may not be shareable among dpdk
79 * The pointer to the ring structure followed by the objects table.
81 * The name of the ring.
83 * The number of elements in the ring (must be a power of 2,
84 * unless RING_F_EXACT_SZ is set in flags).
86 * An OR of the following:
87 * - One of mutually exclusive flags that define producer behavior:
88 * - RING_F_SP_ENQ: If this flag is set, the default behavior when
89 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
90 * is "single-producer".
91 * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
92 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
93 * is "multi-producer RTS mode".
94 * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
95 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
96 * is "multi-producer HTS mode".
97 * If none of these flags is set, then default "multi-producer"
98 * behavior is selected.
99 * - One of mutually exclusive flags that define consumer behavior:
100 * - RING_F_SC_DEQ: If this flag is set, the default behavior when
101 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
102 * is "single-consumer". Otherwise, it is "multi-consumers".
103 * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
104 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
105 * is "multi-consumer RTS mode".
106 * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
107 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
108 * is "multi-consumer HTS mode".
109 * If none of these flags is set, then default "multi-consumer"
110 * behavior is selected.
111 * - RING_F_EXACT_SZ: If this flag is set, the ring will hold exactly the
112 * requested number of entries, and the requested size will be rounded up
113 * to the next power of two, but the usable space will be exactly that
114 * requested. Worst case, if a power-of-2 size is requested, half the
115 * ring space will be wasted.
116 * Without this flag set, the ring size requested must be a power of 2,
117 * and the usable space will be that size - 1.
119 * 0 on success, or a negative value on error.
121 int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
125 * Create a new ring named *name* in memory.
127 * This function uses ``memzone_reserve()`` to allocate memory. Then it
128 * calls rte_ring_init() to initialize an empty ring.
130 * The new ring size is set to *count*, which must be a power of
131 * two. Water marking is disabled by default. The real usable ring size
132 * is *count-1* instead of *count* to differentiate a full ring from an
135 * The ring is added in RTE_TAILQ_RING list.
138 * The name of the ring.
140 * The size of the ring (must be a power of 2,
141 * unless RING_F_EXACT_SZ is set in flags).
143 * The *socket_id* argument is the socket identifier in case of
144 * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
145 * constraint for the reserved zone.
147 * An OR of the following:
148 * - One of mutually exclusive flags that define producer behavior:
149 * - RING_F_SP_ENQ: If this flag is set, the default behavior when
150 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
151 * is "single-producer".
152 * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
153 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
154 * is "multi-producer RTS mode".
155 * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
156 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
157 * is "multi-producer HTS mode".
158 * If none of these flags is set, then default "multi-producer"
159 * behavior is selected.
160 * - One of mutually exclusive flags that define consumer behavior:
161 * - RING_F_SC_DEQ: If this flag is set, the default behavior when
162 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
163 * is "single-consumer". Otherwise, it is "multi-consumers".
164 * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
165 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
166 * is "multi-consumer RTS mode".
167 * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
168 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
169 * is "multi-consumer HTS mode".
170 * If none of these flags is set, then default "multi-consumer"
171 * behavior is selected.
172 * - RING_F_EXACT_SZ: If this flag is set, the ring will hold exactly the
173 * requested number of entries, and the requested size will be rounded up
174 * to the next power of two, but the usable space will be exactly that
175 * requested. Worst case, if a power-of-2 size is requested, half the
176 * ring space will be wasted.
177 * Without this flag set, the ring size requested must be a power of 2,
178 * and the usable space will be that size - 1.
180 * On success, the pointer to the new allocated ring. NULL on error with
181 * rte_errno set appropriately. Possible errno values include:
182 * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
183 * - E_RTE_SECONDARY - function was called from a secondary process instance
184 * - EINVAL - count provided is not a power of 2
185 * - ENOSPC - the maximum number of memzones has already been allocated
186 * - EEXIST - a memzone with the same name already exists
187 * - ENOMEM - no appropriate memory area found in which to create memzone
189 struct rte_ring *rte_ring_create(const char *name, unsigned int count,
190 int socket_id, unsigned int flags);
193 * De-allocate all memory used by the ring.
198 void rte_ring_free(struct rte_ring *r);
201 * Dump the status of the ring to a file.
204 * A pointer to a file for output
206 * A pointer to the ring structure.
208 void rte_ring_dump(FILE *f, const struct rte_ring *r);
211 * Enqueue several objects on the ring (multi-producers safe).
213 * This function uses a "compare and set" instruction to move the
214 * producer index atomically.
217 * A pointer to the ring structure.
219 * A pointer to a table of void * pointers (objects).
221 * The number of objects to add in the ring from the obj_table.
223 * if non-NULL, returns the amount of space in the ring after the
224 * enqueue operation has finished.
226 * The number of objects enqueued, either 0 or n
228 static __rte_always_inline unsigned int
229 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
230 unsigned int n, unsigned int *free_space)
232 return rte_ring_mp_enqueue_bulk_elem(r, obj_table, sizeof(void *),
237 * Enqueue several objects on a ring (NOT multi-producers safe).
240 * A pointer to the ring structure.
242 * A pointer to a table of void * pointers (objects).
244 * The number of objects to add in the ring from the obj_table.
246 * if non-NULL, returns the amount of space in the ring after the
247 * enqueue operation has finished.
249 * The number of objects enqueued, either 0 or n
251 static __rte_always_inline unsigned int
252 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
253 unsigned int n, unsigned int *free_space)
255 return rte_ring_sp_enqueue_bulk_elem(r, obj_table, sizeof(void *),
260 * Enqueue several objects on a ring.
262 * This function calls the multi-producer or the single-producer
263 * version depending on the default behavior that was specified at
264 * ring creation time (see flags).
267 * A pointer to the ring structure.
269 * A pointer to a table of void * pointers (objects).
271 * The number of objects to add in the ring from the obj_table.
273 * if non-NULL, returns the amount of space in the ring after the
274 * enqueue operation has finished.
276 * The number of objects enqueued, either 0 or n
278 static __rte_always_inline unsigned int
279 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
280 unsigned int n, unsigned int *free_space)
282 return rte_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *),
287 * Enqueue one object on a ring (multi-producers safe).
289 * This function uses a "compare and set" instruction to move the
290 * producer index atomically.
293 * A pointer to the ring structure.
295 * A pointer to the object to be added.
297 * - 0: Success; objects enqueued.
298 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
300 static __rte_always_inline int
301 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
303 return rte_ring_mp_enqueue_elem(r, &obj, sizeof(void *));
307 * Enqueue one object on a ring (NOT multi-producers safe).
310 * A pointer to the ring structure.
312 * A pointer to the object to be added.
314 * - 0: Success; objects enqueued.
315 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
317 static __rte_always_inline int
318 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
320 return rte_ring_sp_enqueue_elem(r, &obj, sizeof(void *));
324 * Enqueue one object on a ring.
326 * This function calls the multi-producer or the single-producer
327 * version, depending on the default behaviour that was specified at
328 * ring creation time (see flags).
331 * A pointer to the ring structure.
333 * A pointer to the object to be added.
335 * - 0: Success; objects enqueued.
336 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
338 static __rte_always_inline int
339 rte_ring_enqueue(struct rte_ring *r, void *obj)
341 return rte_ring_enqueue_elem(r, &obj, sizeof(void *));
345 * Dequeue several objects from a ring (multi-consumers safe).
347 * This function uses a "compare and set" instruction to move the
348 * consumer index atomically.
351 * A pointer to the ring structure.
353 * A pointer to a table of void * pointers (objects) that will be filled.
355 * The number of objects to dequeue from the ring to the obj_table.
357 * If non-NULL, returns the number of remaining ring entries after the
358 * dequeue has finished.
360 * The number of objects dequeued, either 0 or n
362 static __rte_always_inline unsigned int
363 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
364 unsigned int n, unsigned int *available)
366 return rte_ring_mc_dequeue_bulk_elem(r, obj_table, sizeof(void *),
371 * Dequeue several objects from a ring (NOT multi-consumers safe).
374 * A pointer to the ring structure.
376 * A pointer to a table of void * pointers (objects) that will be filled.
378 * The number of objects to dequeue from the ring to the obj_table,
379 * must be strictly positive.
381 * If non-NULL, returns the number of remaining ring entries after the
382 * dequeue has finished.
384 * The number of objects dequeued, either 0 or n
386 static __rte_always_inline unsigned int
387 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
388 unsigned int n, unsigned int *available)
390 return rte_ring_sc_dequeue_bulk_elem(r, obj_table, sizeof(void *),
395 * Dequeue several objects from a ring.
397 * This function calls the multi-consumers or the single-consumer
398 * version, depending on the default behaviour that was specified at
399 * ring creation time (see flags).
402 * A pointer to the ring structure.
404 * A pointer to a table of void * pointers (objects) that will be filled.
406 * The number of objects to dequeue from the ring to the obj_table.
408 * If non-NULL, returns the number of remaining ring entries after the
409 * dequeue has finished.
411 * The number of objects dequeued, either 0 or n
413 static __rte_always_inline unsigned int
414 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
415 unsigned int *available)
417 return rte_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
422 * Dequeue one object from a ring (multi-consumers safe).
424 * This function uses a "compare and set" instruction to move the
425 * consumer index atomically.
428 * A pointer to the ring structure.
430 * A pointer to a void * pointer (object) that will be filled.
432 * - 0: Success; objects dequeued.
433 * - -ENOENT: Not enough entries in the ring to dequeue; no object is
436 static __rte_always_inline int
437 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
439 return rte_ring_mc_dequeue_elem(r, obj_p, sizeof(void *));
443 * Dequeue one object from a ring (NOT multi-consumers safe).
446 * A pointer to the ring structure.
448 * A pointer to a void * pointer (object) that will be filled.
450 * - 0: Success; objects dequeued.
451 * - -ENOENT: Not enough entries in the ring to dequeue, no object is
454 static __rte_always_inline int
455 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
457 return rte_ring_sc_dequeue_elem(r, obj_p, sizeof(void *));
461 * Dequeue one object from a ring.
463 * This function calls the multi-consumers or the single-consumer
464 * version depending on the default behaviour that was specified at
465 * ring creation time (see flags).
468 * A pointer to the ring structure.
470 * A pointer to a void * pointer (object) that will be filled.
472 * - 0: Success, objects dequeued.
473 * - -ENOENT: Not enough entries in the ring to dequeue, no object is
476 static __rte_always_inline int
477 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
479 return rte_ring_dequeue_elem(r, obj_p, sizeof(void *));
485 * This function flush all the elements in a ring
488 * Make sure the ring is not in use while calling this function.
491 * A pointer to the ring structure.
494 rte_ring_reset(struct rte_ring *r);
497 * Return the number of entries in a ring.
500 * A pointer to the ring structure.
502 * The number of entries in the ring.
504 static inline unsigned int
505 rte_ring_count(const struct rte_ring *r)
507 uint32_t prod_tail = r->prod.tail;
508 uint32_t cons_tail = r->cons.tail;
509 uint32_t count = (prod_tail - cons_tail) & r->mask;
510 return (count > r->capacity) ? r->capacity : count;
514 * Return the number of free entries in a ring.
517 * A pointer to the ring structure.
519 * The number of free entries in the ring.
521 static inline unsigned int
522 rte_ring_free_count(const struct rte_ring *r)
524 return r->capacity - rte_ring_count(r);
528 * Test if a ring is full.
531 * A pointer to the ring structure.
533 * - 1: The ring is full.
534 * - 0: The ring is not full.
537 rte_ring_full(const struct rte_ring *r)
539 return rte_ring_free_count(r) == 0;
543 * Test if a ring is empty.
546 * A pointer to the ring structure.
548 * - 1: The ring is empty.
549 * - 0: The ring is not empty.
552 rte_ring_empty(const struct rte_ring *r)
554 uint32_t prod_tail = r->prod.tail;
555 uint32_t cons_tail = r->cons.tail;
556 return cons_tail == prod_tail;
560 * Return the size of the ring.
563 * A pointer to the ring structure.
565 * The size of the data store used by the ring.
566 * NOTE: this is not the same as the usable space in the ring. To query that
567 * use ``rte_ring_get_capacity()``.
569 static inline unsigned int
570 rte_ring_get_size(const struct rte_ring *r)
576 * Return the number of elements which can be stored in the ring.
579 * A pointer to the ring structure.
581 * The usable size of the ring.
583 static inline unsigned int
584 rte_ring_get_capacity(const struct rte_ring *r)
590 * Return sync type used by producer in the ring.
593 * A pointer to the ring structure.
595 * Producer sync type value.
597 static inline enum rte_ring_sync_type
598 rte_ring_get_prod_sync_type(const struct rte_ring *r)
600 return r->prod.sync_type;
604 * Check is the ring for single producer.
607 * A pointer to the ring structure.
609 * true if ring is SP, zero otherwise.
612 rte_ring_is_prod_single(const struct rte_ring *r)
614 return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
618 * Return sync type used by consumer in the ring.
621 * A pointer to the ring structure.
623 * Consumer sync type value.
625 static inline enum rte_ring_sync_type
626 rte_ring_get_cons_sync_type(const struct rte_ring *r)
628 return r->cons.sync_type;
632 * Check is the ring for single consumer.
635 * A pointer to the ring structure.
637 * true if ring is SC, zero otherwise.
640 rte_ring_is_cons_single(const struct rte_ring *r)
642 return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
646 * Dump the status of all rings on the console
649 * A pointer to a file for output
651 void rte_ring_list_dump(FILE *f);
654 * Search a ring from its name
657 * The name of the ring.
659 * The pointer to the ring matching the name, or NULL if not found,
660 * with rte_errno set appropriately. Possible rte_errno values include:
661 * - ENOENT - required entry not available to return.
663 struct rte_ring *rte_ring_lookup(const char *name);
666 * Enqueue several objects on the ring (multi-producers safe).
668 * This function uses a "compare and set" instruction to move the
669 * producer index atomically.
672 * A pointer to the ring structure.
674 * A pointer to a table of void * pointers (objects).
676 * The number of objects to add in the ring from the obj_table.
678 * if non-NULL, returns the amount of space in the ring after the
679 * enqueue operation has finished.
681 * - n: Actual number of objects enqueued.
683 static __rte_always_inline unsigned int
684 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
685 unsigned int n, unsigned int *free_space)
687 return rte_ring_mp_enqueue_burst_elem(r, obj_table, sizeof(void *),
692 * Enqueue several objects on a ring (NOT multi-producers safe).
695 * A pointer to the ring structure.
697 * A pointer to a table of void * pointers (objects).
699 * The number of objects to add in the ring from the obj_table.
701 * if non-NULL, returns the amount of space in the ring after the
702 * enqueue operation has finished.
704 * - n: Actual number of objects enqueued.
706 static __rte_always_inline unsigned int
707 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
708 unsigned int n, unsigned int *free_space)
710 return rte_ring_sp_enqueue_burst_elem(r, obj_table, sizeof(void *),
715 * Enqueue several objects on a ring.
717 * This function calls the multi-producer or the single-producer
718 * version depending on the default behavior that was specified at
719 * ring creation time (see flags).
722 * A pointer to the ring structure.
724 * A pointer to a table of void * pointers (objects).
726 * The number of objects to add in the ring from the obj_table.
728 * if non-NULL, returns the amount of space in the ring after the
729 * enqueue operation has finished.
731 * - n: Actual number of objects enqueued.
733 static __rte_always_inline unsigned int
734 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
735 unsigned int n, unsigned int *free_space)
737 return rte_ring_enqueue_burst_elem(r, obj_table, sizeof(void *),
742 * Dequeue several objects from a ring (multi-consumers safe). When the request
743 * objects are more than the available objects, only dequeue the actual number
746 * This function uses a "compare and set" instruction to move the
747 * consumer index atomically.
750 * A pointer to the ring structure.
752 * A pointer to a table of void * pointers (objects) that will be filled.
754 * The number of objects to dequeue from the ring to the obj_table.
756 * If non-NULL, returns the number of remaining ring entries after the
757 * dequeue has finished.
759 * - n: Actual number of objects dequeued, 0 if ring is empty
761 static __rte_always_inline unsigned int
762 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
763 unsigned int n, unsigned int *available)
765 return rte_ring_mc_dequeue_burst_elem(r, obj_table, sizeof(void *),
770 * Dequeue several objects from a ring (NOT multi-consumers safe).When the
771 * request objects are more than the available objects, only dequeue the
772 * actual number of objects
775 * A pointer to the ring structure.
777 * A pointer to a table of void * pointers (objects) that will be filled.
779 * The number of objects to dequeue from the ring to the obj_table.
781 * If non-NULL, returns the number of remaining ring entries after the
782 * dequeue has finished.
784 * - n: Actual number of objects dequeued, 0 if ring is empty
786 static __rte_always_inline unsigned int
787 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
788 unsigned int n, unsigned int *available)
790 return rte_ring_sc_dequeue_burst_elem(r, obj_table, sizeof(void *),
795 * Dequeue multiple objects from a ring up to a maximum number.
797 * This function calls the multi-consumers or the single-consumer
798 * version, depending on the default behaviour that was specified at
799 * ring creation time (see flags).
802 * A pointer to the ring structure.
804 * A pointer to a table of void * pointers (objects) that will be filled.
806 * The number of objects to dequeue from the ring to the obj_table.
808 * If non-NULL, returns the number of remaining ring entries after the
809 * dequeue has finished.
811 * - Number of objects dequeued
813 static __rte_always_inline unsigned int
814 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
815 unsigned int n, unsigned int *available)
817 return rte_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
825 #endif /* _RTE_RING_H_ */