eventdev: make trace API internal
[dpdk.git] / lib / ring / rte_ring_c11_pvt.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017,2018 HXT-semitech Corporation.
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * Copyright (c) 2021 Arm Limited
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10
11 #ifndef _RTE_RING_C11_PVT_H_
12 #define _RTE_RING_C11_PVT_H_
13
14 static __rte_always_inline void
15 __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
16                 uint32_t new_val, uint32_t single, uint32_t enqueue)
17 {
18         RTE_SET_USED(enqueue);
19
20         /*
21          * If there are other enqueues/dequeues in progress that preceded us,
22          * we need to wait for them to complete
23          */
24         if (!single)
25                 rte_wait_until_equal_32(&ht->tail, old_val, __ATOMIC_RELAXED);
26
27         __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
28 }
29
30 /**
31  * @internal This function updates the producer head for enqueue
32  *
33  * @param r
34  *   A pointer to the ring structure
35  * @param is_sp
36  *   Indicates whether multi-producer path is needed or not
37  * @param n
38  *   The number of elements we will want to enqueue, i.e. how far should the
39  *   head be moved
40  * @param behavior
41  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
42  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
43  * @param old_head
44  *   Returns head value as it was before the move, i.e. where enqueue starts
45  * @param new_head
46  *   Returns the current/new head value i.e. where enqueue finishes
47  * @param free_entries
48  *   Returns the amount of free space in the ring BEFORE head was moved
49  * @return
50  *   Actual number of objects enqueued.
51  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
52  */
53 static __rte_always_inline unsigned int
54 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
55                 unsigned int n, enum rte_ring_queue_behavior behavior,
56                 uint32_t *old_head, uint32_t *new_head,
57                 uint32_t *free_entries)
58 {
59         const uint32_t capacity = r->capacity;
60         uint32_t cons_tail;
61         unsigned int max = n;
62         int success;
63
64         *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
65         do {
66                 /* Reset n to the initial burst count */
67                 n = max;
68
69                 /* Ensure the head is read before tail */
70                 __atomic_thread_fence(__ATOMIC_ACQUIRE);
71
72                 /* load-acquire synchronize with store-release of ht->tail
73                  * in update_tail.
74                  */
75                 cons_tail = __atomic_load_n(&r->cons.tail,
76                                         __ATOMIC_ACQUIRE);
77
78                 /* The subtraction is done between two unsigned 32bits value
79                  * (the result is always modulo 32 bits even if we have
80                  * *old_head > cons_tail). So 'free_entries' is always between 0
81                  * and capacity (which is < size).
82                  */
83                 *free_entries = (capacity + cons_tail - *old_head);
84
85                 /* check that we have enough room in ring */
86                 if (unlikely(n > *free_entries))
87                         n = (behavior == RTE_RING_QUEUE_FIXED) ?
88                                         0 : *free_entries;
89
90                 if (n == 0)
91                         return 0;
92
93                 *new_head = *old_head + n;
94                 if (is_sp)
95                         r->prod.head = *new_head, success = 1;
96                 else
97                         /* on failure, *old_head is updated */
98                         success = __atomic_compare_exchange_n(&r->prod.head,
99                                         old_head, *new_head,
100                                         0, __ATOMIC_RELAXED,
101                                         __ATOMIC_RELAXED);
102         } while (unlikely(success == 0));
103         return n;
104 }
105
106 /**
107  * @internal This function updates the consumer head for dequeue
108  *
109  * @param r
110  *   A pointer to the ring structure
111  * @param is_sc
112  *   Indicates whether multi-consumer path is needed or not
113  * @param n
114  *   The number of elements we will want to dequeue, i.e. how far should the
115  *   head be moved
116  * @param behavior
117  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
118  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
119  * @param old_head
120  *   Returns head value as it was before the move, i.e. where dequeue starts
121  * @param new_head
122  *   Returns the current/new head value i.e. where dequeue finishes
123  * @param entries
124  *   Returns the number of entries in the ring BEFORE head was moved
125  * @return
126  *   - Actual number of objects dequeued.
127  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
128  */
129 static __rte_always_inline unsigned int
130 __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
131                 unsigned int n, enum rte_ring_queue_behavior behavior,
132                 uint32_t *old_head, uint32_t *new_head,
133                 uint32_t *entries)
134 {
135         unsigned int max = n;
136         uint32_t prod_tail;
137         int success;
138
139         /* move cons.head atomically */
140         *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
141         do {
142                 /* Restore n as it may change every loop */
143                 n = max;
144
145                 /* Ensure the head is read before tail */
146                 __atomic_thread_fence(__ATOMIC_ACQUIRE);
147
148                 /* this load-acquire synchronize with store-release of ht->tail
149                  * in update_tail.
150                  */
151                 prod_tail = __atomic_load_n(&r->prod.tail,
152                                         __ATOMIC_ACQUIRE);
153
154                 /* The subtraction is done between two unsigned 32bits value
155                  * (the result is always modulo 32 bits even if we have
156                  * cons_head > prod_tail). So 'entries' is always between 0
157                  * and size(ring)-1.
158                  */
159                 *entries = (prod_tail - *old_head);
160
161                 /* Set the actual entries for dequeue */
162                 if (n > *entries)
163                         n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
164
165                 if (unlikely(n == 0))
166                         return 0;
167
168                 *new_head = *old_head + n;
169                 if (is_sc)
170                         r->cons.head = *new_head, success = 1;
171                 else
172                         /* on failure, *old_head will be updated */
173                         success = __atomic_compare_exchange_n(&r->cons.head,
174                                                         old_head, *new_head,
175                                                         0, __ATOMIC_RELAXED,
176                                                         __ATOMIC_RELAXED);
177         } while (unlikely(success == 0));
178         return n;
179 }
180
181 #endif /* _RTE_RING_C11_PVT_H_ */