ring: introduce RTS ring mode
[dpdk.git] / lib / librte_ring / rte_ring_rts_c11_mem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_RTS_C11_MEM_H_
11 #define _RTE_RING_RTS_C11_MEM_H_
12
13 /**
14  * @file rte_ring_rts_c11_mem.h
15  * It is not recommended to include this file directly,
16  * include <rte_ring.h> instead.
17  * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode.
18  * For more information please refer to <rte_ring_rts.h>.
19  */
20
21 /**
22  * @internal This function updates tail values.
23  */
24 static __rte_always_inline void
25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26 {
27         union __rte_ring_rts_poscnt h, ot, nt;
28
29         /*
30          * If there are other enqueues/dequeues in progress that
31          * might preceded us, then don't update tail with new value.
32          */
33
34         ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
35
36         do {
37                 /* on 32-bit systems we have to do atomic read here */
38                 h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
39
40                 nt.raw = ot.raw;
41                 if (++nt.val.cnt == h.val.cnt)
42                         nt.val.pos = h.val.pos;
43
44         } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
45                         0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
46 }
47
48 /**
49  * @internal This function waits till head/tail distance wouldn't
50  * exceed pre-defined max value.
51  */
52 static __rte_always_inline void
53 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
54         union __rte_ring_rts_poscnt *h)
55 {
56         uint32_t max;
57
58         max = ht->htd_max;
59
60         while (h->val.pos - ht->tail.val.pos > max) {
61                 rte_pause();
62                 h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE);
63         }
64 }
65
66 /**
67  * @internal This function updates the producer head for enqueue.
68  */
69 static __rte_always_inline uint32_t
70 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
71         enum rte_ring_queue_behavior behavior, uint32_t *old_head,
72         uint32_t *free_entries)
73 {
74         uint32_t n;
75         union __rte_ring_rts_poscnt nh, oh;
76
77         const uint32_t capacity = r->capacity;
78
79         oh.raw = __atomic_load_n(&r->rts_prod.head.raw, __ATOMIC_ACQUIRE);
80
81         do {
82                 /* Reset n to the initial burst count */
83                 n = num;
84
85                 /*
86                  * wait for prod head/tail distance,
87                  * make sure that we read prod head *before*
88                  * reading cons tail.
89                  */
90                 __rte_ring_rts_head_wait(&r->rts_prod, &oh);
91
92                 /*
93                  *  The subtraction is done between two unsigned 32bits value
94                  * (the result is always modulo 32 bits even if we have
95                  * *old_head > cons_tail). So 'free_entries' is always between 0
96                  * and capacity (which is < size).
97                  */
98                 *free_entries = capacity + r->cons.tail - oh.val.pos;
99
100                 /* check that we have enough room in ring */
101                 if (unlikely(n > *free_entries))
102                         n = (behavior == RTE_RING_QUEUE_FIXED) ?
103                                         0 : *free_entries;
104
105                 if (n == 0)
106                         break;
107
108                 nh.val.pos = oh.val.pos + n;
109                 nh.val.cnt = oh.val.cnt + 1;
110
111         /*
112          * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
113          *  - OOO reads of cons tail value
114          *  - OOO copy of elems to the ring
115          */
116         } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
117                         &oh.raw, nh.raw,
118                         0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
119
120         *old_head = oh.val.pos;
121         return n;
122 }
123
124 /**
125  * @internal This function updates the consumer head for dequeue
126  */
127 static __rte_always_inline unsigned int
128 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
129         enum rte_ring_queue_behavior behavior, uint32_t *old_head,
130         uint32_t *entries)
131 {
132         uint32_t n;
133         union __rte_ring_rts_poscnt nh, oh;
134
135         oh.raw = __atomic_load_n(&r->rts_cons.head.raw, __ATOMIC_ACQUIRE);
136
137         /* move cons.head atomically */
138         do {
139                 /* Restore n as it may change every loop */
140                 n = num;
141
142                 /*
143                  * wait for cons head/tail distance,
144                  * make sure that we read cons head *before*
145                  * reading prod tail.
146                  */
147                 __rte_ring_rts_head_wait(&r->rts_cons, &oh);
148
149                 /* The subtraction is done between two unsigned 32bits value
150                  * (the result is always modulo 32 bits even if we have
151                  * cons_head > prod_tail). So 'entries' is always between 0
152                  * and size(ring)-1.
153                  */
154                 *entries = r->prod.tail - oh.val.pos;
155
156                 /* Set the actual entries for dequeue */
157                 if (n > *entries)
158                         n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
159
160                 if (unlikely(n == 0))
161                         break;
162
163                 nh.val.pos = oh.val.pos + n;
164                 nh.val.cnt = oh.val.cnt + 1;
165
166         /*
167          * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
168          *  - OOO reads of prod tail value
169          *  - OOO copy of elems from the ring
170          */
171         } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
172                         &oh.raw, nh.raw,
173                         0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
174
175         *old_head = oh.val.pos;
176         return n;
177 }
178
179 #endif /* _RTE_RING_RTS_C11_MEM_H_ */