1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2010-2020 Intel Corporation
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
17 * The Ring Manager is a fixed-size queue, implemented as a table of
18 * pointers. Head and tail pointers are modified atomically, allowing
19 * concurrent access to it. It has the following features:
21 * - FIFO (First In First Out)
22 * - Maximum size is fixed; the pointers are stored in a table.
23 * - Lockless implementation.
24 * - Multi- or single-consumer dequeue.
25 * - Multi- or single-producer enqueue.
28 * - Ability to select different sync modes for producer/consumer.
29 * - Dequeue start/finish (depending on consumer sync modes).
30 * - Enqueue start/finish (depending on producer sync mode).
32 * Note: the ring implementation is not preemptible. Refer to Programmer's
33 * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
34 * for more information.
42 #include <rte_ring_core.h>
43 #include <rte_ring_elem.h>
46 * Calculate the memory size needed for a ring
48 * This function returns the number of bytes needed for a ring, given
49 * the number of elements in it. This value is the sum of the size of
50 * the structure rte_ring and the size of the memory needed by the
51 * objects pointers. The value is aligned to a cache line size.
54 * The number of elements in the ring (must be a power of 2).
56 * - The memory size needed for the ring on success.
57 * - -EINVAL if count is not a power of 2.
59 ssize_t rte_ring_get_memsize(unsigned int count);
62 * Initialize a ring structure.
64 * Initialize a ring structure in memory pointed by "r". The size of the
65 * memory area must be large enough to store the ring structure and the
66 * object table. It is advised to use rte_ring_get_memsize() to get the
69 * The ring size is set to *count*, which must be a power of two. Water
70 * marking is disabled by default. The real usable ring size is
71 * *count-1* instead of *count* to differentiate a free ring from an
74 * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
75 * memory given by the caller may not be shareable among dpdk
79 * The pointer to the ring structure followed by the objects table.
81 * The name of the ring.
83 * The number of elements in the ring (must be a power of 2).
85 * An OR of the following:
86 * - One of mutually exclusive flags that define producer behavior:
87 * - RING_F_SP_ENQ: If this flag is set, the default behavior when
88 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
89 * is "single-producer".
90 * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
91 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
92 * is "multi-producer RTS mode".
93 * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
94 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
95 * is "multi-producer HTS mode".
96 * If none of these flags is set, then default "multi-producer"
97 * behavior is selected.
98 * - One of mutually exclusive flags that define consumer behavior:
99 * - RING_F_SC_DEQ: If this flag is set, the default behavior when
100 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
101 * is "single-consumer". Otherwise, it is "multi-consumers".
102 * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
103 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
104 * is "multi-consumer RTS mode".
105 * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
106 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
107 * is "multi-consumer HTS mode".
108 * If none of these flags is set, then default "multi-consumer"
109 * behavior is selected.
111 * 0 on success, or a negative value on error.
113 int rte_ring_init(struct rte_ring *r, const char *name, unsigned int count,
117 * Create a new ring named *name* in memory.
119 * This function uses ``memzone_reserve()`` to allocate memory. Then it
120 * calls rte_ring_init() to initialize an empty ring.
122 * The new ring size is set to *count*, which must be a power of
123 * two. Water marking is disabled by default. The real usable ring size
124 * is *count-1* instead of *count* to differentiate a free ring from an
127 * The ring is added in RTE_TAILQ_RING list.
130 * The name of the ring.
132 * The size of the ring (must be a power of 2).
134 * The *socket_id* argument is the socket identifier in case of
135 * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
136 * constraint for the reserved zone.
138 * An OR of the following:
139 * - One of mutually exclusive flags that define producer behavior:
140 * - RING_F_SP_ENQ: If this flag is set, the default behavior when
141 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
142 * is "single-producer".
143 * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
144 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
145 * is "multi-producer RTS mode".
146 * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
147 * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
148 * is "multi-producer HTS mode".
149 * If none of these flags is set, then default "multi-producer"
150 * behavior is selected.
151 * - One of mutually exclusive flags that define consumer behavior:
152 * - RING_F_SC_DEQ: If this flag is set, the default behavior when
153 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
154 * is "single-consumer". Otherwise, it is "multi-consumers".
155 * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
156 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
157 * is "multi-consumer RTS mode".
158 * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
159 * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
160 * is "multi-consumer HTS mode".
161 * If none of these flags is set, then default "multi-consumer"
162 * behavior is selected.
164 * On success, the pointer to the new allocated ring. NULL on error with
165 * rte_errno set appropriately. Possible errno values include:
166 * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
167 * - E_RTE_SECONDARY - function was called from a secondary process instance
168 * - EINVAL - count provided is not a power of 2
169 * - ENOSPC - the maximum number of memzones has already been allocated
170 * - EEXIST - a memzone with the same name already exists
171 * - ENOMEM - no appropriate memory area found in which to create memzone
173 struct rte_ring *rte_ring_create(const char *name, unsigned int count,
174 int socket_id, unsigned int flags);
177 * De-allocate all memory used by the ring.
182 void rte_ring_free(struct rte_ring *r);
185 * Dump the status of the ring to a file.
188 * A pointer to a file for output
190 * A pointer to the ring structure.
192 void rte_ring_dump(FILE *f, const struct rte_ring *r);
195 * Enqueue several objects on the ring (multi-producers safe).
197 * This function uses a "compare and set" instruction to move the
198 * producer index atomically.
201 * A pointer to the ring structure.
203 * A pointer to a table of void * pointers (objects).
205 * The number of objects to add in the ring from the obj_table.
207 * if non-NULL, returns the amount of space in the ring after the
208 * enqueue operation has finished.
210 * The number of objects enqueued, either 0 or n
212 static __rte_always_inline unsigned int
213 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
214 unsigned int n, unsigned int *free_space)
216 return rte_ring_mp_enqueue_bulk_elem(r, obj_table, sizeof(void *),
221 * Enqueue several objects on a ring (NOT multi-producers safe).
224 * A pointer to the ring structure.
226 * A pointer to a table of void * pointers (objects).
228 * The number of objects to add in the ring from the obj_table.
230 * if non-NULL, returns the amount of space in the ring after the
231 * enqueue operation has finished.
233 * The number of objects enqueued, either 0 or n
235 static __rte_always_inline unsigned int
236 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
237 unsigned int n, unsigned int *free_space)
239 return rte_ring_sp_enqueue_bulk_elem(r, obj_table, sizeof(void *),
244 * Enqueue several objects on a ring.
246 * This function calls the multi-producer or the single-producer
247 * version depending on the default behavior that was specified at
248 * ring creation time (see flags).
251 * A pointer to the ring structure.
253 * A pointer to a table of void * pointers (objects).
255 * The number of objects to add in the ring from the obj_table.
257 * if non-NULL, returns the amount of space in the ring after the
258 * enqueue operation has finished.
260 * The number of objects enqueued, either 0 or n
262 static __rte_always_inline unsigned int
263 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
264 unsigned int n, unsigned int *free_space)
266 return rte_ring_enqueue_bulk_elem(r, obj_table, sizeof(void *),
271 * Enqueue one object on a ring (multi-producers safe).
273 * This function uses a "compare and set" instruction to move the
274 * producer index atomically.
277 * A pointer to the ring structure.
279 * A pointer to the object to be added.
281 * - 0: Success; objects enqueued.
282 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
284 static __rte_always_inline int
285 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
287 return rte_ring_mp_enqueue_elem(r, &obj, sizeof(void *));
291 * Enqueue one object on a ring (NOT multi-producers safe).
294 * A pointer to the ring structure.
296 * A pointer to the object to be added.
298 * - 0: Success; objects enqueued.
299 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
301 static __rte_always_inline int
302 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
304 return rte_ring_sp_enqueue_elem(r, &obj, sizeof(void *));
308 * Enqueue one object on a ring.
310 * This function calls the multi-producer or the single-producer
311 * version, depending on the default behaviour that was specified at
312 * ring creation time (see flags).
315 * A pointer to the ring structure.
317 * A pointer to the object to be added.
319 * - 0: Success; objects enqueued.
320 * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
322 static __rte_always_inline int
323 rte_ring_enqueue(struct rte_ring *r, void *obj)
325 return rte_ring_enqueue_elem(r, &obj, sizeof(void *));
329 * Dequeue several objects from a ring (multi-consumers safe).
331 * This function uses a "compare and set" instruction to move the
332 * consumer index atomically.
335 * A pointer to the ring structure.
337 * A pointer to a table of void * pointers (objects) that will be filled.
339 * The number of objects to dequeue from the ring to the obj_table.
341 * If non-NULL, returns the number of remaining ring entries after the
342 * dequeue has finished.
344 * The number of objects dequeued, either 0 or n
346 static __rte_always_inline unsigned int
347 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
348 unsigned int n, unsigned int *available)
350 return rte_ring_mc_dequeue_bulk_elem(r, obj_table, sizeof(void *),
355 * Dequeue several objects from a ring (NOT multi-consumers safe).
358 * A pointer to the ring structure.
360 * A pointer to a table of void * pointers (objects) that will be filled.
362 * The number of objects to dequeue from the ring to the obj_table,
363 * must be strictly positive.
365 * If non-NULL, returns the number of remaining ring entries after the
366 * dequeue has finished.
368 * The number of objects dequeued, either 0 or n
370 static __rte_always_inline unsigned int
371 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
372 unsigned int n, unsigned int *available)
374 return rte_ring_sc_dequeue_bulk_elem(r, obj_table, sizeof(void *),
379 * Dequeue several objects from a ring.
381 * This function calls the multi-consumers or the single-consumer
382 * version, depending on the default behaviour that was specified at
383 * ring creation time (see flags).
386 * A pointer to the ring structure.
388 * A pointer to a table of void * pointers (objects) that will be filled.
390 * The number of objects to dequeue from the ring to the obj_table.
392 * If non-NULL, returns the number of remaining ring entries after the
393 * dequeue has finished.
395 * The number of objects dequeued, either 0 or n
397 static __rte_always_inline unsigned int
398 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
399 unsigned int *available)
401 return rte_ring_dequeue_bulk_elem(r, obj_table, sizeof(void *),
406 * Dequeue one object from a ring (multi-consumers safe).
408 * This function uses a "compare and set" instruction to move the
409 * consumer index atomically.
412 * A pointer to the ring structure.
414 * A pointer to a void * pointer (object) that will be filled.
416 * - 0: Success; objects dequeued.
417 * - -ENOENT: Not enough entries in the ring to dequeue; no object is
420 static __rte_always_inline int
421 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
423 return rte_ring_mc_dequeue_elem(r, obj_p, sizeof(void *));
427 * Dequeue one object from a ring (NOT multi-consumers safe).
430 * A pointer to the ring structure.
432 * A pointer to a void * pointer (object) that will be filled.
434 * - 0: Success; objects dequeued.
435 * - -ENOENT: Not enough entries in the ring to dequeue, no object is
438 static __rte_always_inline int
439 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
441 return rte_ring_sc_dequeue_elem(r, obj_p, sizeof(void *));
445 * Dequeue one object from a ring.
447 * This function calls the multi-consumers or the single-consumer
448 * version depending on the default behaviour that was specified at
449 * ring creation time (see flags).
452 * A pointer to the ring structure.
454 * A pointer to a void * pointer (object) that will be filled.
456 * - 0: Success, objects dequeued.
457 * - -ENOENT: Not enough entries in the ring to dequeue, no object is
460 static __rte_always_inline int
461 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
463 return rte_ring_dequeue_elem(r, obj_p, sizeof(void *));
469 * This function flush all the elements in a ring
472 * Make sure the ring is not in use while calling this function.
475 * A pointer to the ring structure.
478 rte_ring_reset(struct rte_ring *r);
481 * Return the number of entries in a ring.
484 * A pointer to the ring structure.
486 * The number of entries in the ring.
488 static inline unsigned int
489 rte_ring_count(const struct rte_ring *r)
491 uint32_t prod_tail = r->prod.tail;
492 uint32_t cons_tail = r->cons.tail;
493 uint32_t count = (prod_tail - cons_tail) & r->mask;
494 return (count > r->capacity) ? r->capacity : count;
498 * Return the number of free entries in a ring.
501 * A pointer to the ring structure.
503 * The number of free entries in the ring.
505 static inline unsigned int
506 rte_ring_free_count(const struct rte_ring *r)
508 return r->capacity - rte_ring_count(r);
512 * Test if a ring is full.
515 * A pointer to the ring structure.
517 * - 1: The ring is full.
518 * - 0: The ring is not full.
521 rte_ring_full(const struct rte_ring *r)
523 return rte_ring_free_count(r) == 0;
527 * Test if a ring is empty.
530 * A pointer to the ring structure.
532 * - 1: The ring is empty.
533 * - 0: The ring is not empty.
536 rte_ring_empty(const struct rte_ring *r)
538 uint32_t prod_tail = r->prod.tail;
539 uint32_t cons_tail = r->cons.tail;
540 return cons_tail == prod_tail;
544 * Return the size of the ring.
547 * A pointer to the ring structure.
549 * The size of the data store used by the ring.
550 * NOTE: this is not the same as the usable space in the ring. To query that
551 * use ``rte_ring_get_capacity()``.
553 static inline unsigned int
554 rte_ring_get_size(const struct rte_ring *r)
560 * Return the number of elements which can be stored in the ring.
563 * A pointer to the ring structure.
565 * The usable size of the ring.
567 static inline unsigned int
568 rte_ring_get_capacity(const struct rte_ring *r)
574 * Return sync type used by producer in the ring.
577 * A pointer to the ring structure.
579 * Producer sync type value.
581 static inline enum rte_ring_sync_type
582 rte_ring_get_prod_sync_type(const struct rte_ring *r)
584 return r->prod.sync_type;
588 * Check is the ring for single producer.
591 * A pointer to the ring structure.
593 * true if ring is SP, zero otherwise.
596 rte_ring_is_prod_single(const struct rte_ring *r)
598 return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
602 * Return sync type used by consumer in the ring.
605 * A pointer to the ring structure.
607 * Consumer sync type value.
609 static inline enum rte_ring_sync_type
610 rte_ring_get_cons_sync_type(const struct rte_ring *r)
612 return r->cons.sync_type;
616 * Check is the ring for single consumer.
619 * A pointer to the ring structure.
621 * true if ring is SC, zero otherwise.
624 rte_ring_is_cons_single(const struct rte_ring *r)
626 return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
630 * Dump the status of all rings on the console
633 * A pointer to a file for output
635 void rte_ring_list_dump(FILE *f);
638 * Search a ring from its name
641 * The name of the ring.
643 * The pointer to the ring matching the name, or NULL if not found,
644 * with rte_errno set appropriately. Possible rte_errno values include:
645 * - ENOENT - required entry not available to return.
647 struct rte_ring *rte_ring_lookup(const char *name);
650 * Enqueue several objects on the ring (multi-producers safe).
652 * This function uses a "compare and set" instruction to move the
653 * producer index atomically.
656 * A pointer to the ring structure.
658 * A pointer to a table of void * pointers (objects).
660 * The number of objects to add in the ring from the obj_table.
662 * if non-NULL, returns the amount of space in the ring after the
663 * enqueue operation has finished.
665 * - n: Actual number of objects enqueued.
667 static __rte_always_inline unsigned int
668 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
669 unsigned int n, unsigned int *free_space)
671 return rte_ring_mp_enqueue_burst_elem(r, obj_table, sizeof(void *),
676 * Enqueue several objects on a ring (NOT multi-producers safe).
679 * A pointer to the ring structure.
681 * A pointer to a table of void * pointers (objects).
683 * The number of objects to add in the ring from the obj_table.
685 * if non-NULL, returns the amount of space in the ring after the
686 * enqueue operation has finished.
688 * - n: Actual number of objects enqueued.
690 static __rte_always_inline unsigned int
691 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
692 unsigned int n, unsigned int *free_space)
694 return rte_ring_sp_enqueue_burst_elem(r, obj_table, sizeof(void *),
699 * Enqueue several objects on a ring.
701 * This function calls the multi-producer or the single-producer
702 * version depending on the default behavior that was specified at
703 * ring creation time (see flags).
706 * A pointer to the ring structure.
708 * A pointer to a table of void * pointers (objects).
710 * The number of objects to add in the ring from the obj_table.
712 * if non-NULL, returns the amount of space in the ring after the
713 * enqueue operation has finished.
715 * - n: Actual number of objects enqueued.
717 static __rte_always_inline unsigned int
718 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
719 unsigned int n, unsigned int *free_space)
721 return rte_ring_enqueue_burst_elem(r, obj_table, sizeof(void *),
726 * Dequeue several objects from a ring (multi-consumers safe). When the request
727 * objects are more than the available objects, only dequeue the actual number
730 * This function uses a "compare and set" instruction to move the
731 * consumer index atomically.
734 * A pointer to the ring structure.
736 * A pointer to a table of void * pointers (objects) that will be filled.
738 * The number of objects to dequeue from the ring to the obj_table.
740 * If non-NULL, returns the number of remaining ring entries after the
741 * dequeue has finished.
743 * - n: Actual number of objects dequeued, 0 if ring is empty
745 static __rte_always_inline unsigned int
746 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
747 unsigned int n, unsigned int *available)
749 return rte_ring_mc_dequeue_burst_elem(r, obj_table, sizeof(void *),
754 * Dequeue several objects from a ring (NOT multi-consumers safe).When the
755 * request objects are more than the available objects, only dequeue the
756 * actual number of objects
759 * A pointer to the ring structure.
761 * A pointer to a table of void * pointers (objects) that will be filled.
763 * The number of objects to dequeue from the ring to the obj_table.
765 * If non-NULL, returns the number of remaining ring entries after the
766 * dequeue has finished.
768 * - n: Actual number of objects dequeued, 0 if ring is empty
770 static __rte_always_inline unsigned int
771 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
772 unsigned int n, unsigned int *available)
774 return rte_ring_sc_dequeue_burst_elem(r, obj_table, sizeof(void *),
779 * Dequeue multiple objects from a ring up to a maximum number.
781 * This function calls the multi-consumers or the single-consumer
782 * version, depending on the default behaviour that was specified at
783 * ring creation time (see flags).
786 * A pointer to the ring structure.
788 * A pointer to a table of void * pointers (objects) that will be filled.
790 * The number of objects to dequeue from the ring to the obj_table.
792 * If non-NULL, returns the number of remaining ring entries after the
793 * dequeue has finished.
795 * - Number of objects dequeued
797 static __rte_always_inline unsigned int
798 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
799 unsigned int n, unsigned int *available)
801 return rte_ring_dequeue_burst_elem(r, obj_table, sizeof(void *),
809 #endif /* _RTE_RING_H_ */