1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2020 Arm Limited
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
10 #ifndef _RTE_RING_PEEK_ZC_H_
11 #define _RTE_RING_PEEK_ZC_H_
15 * @b EXPERIMENTAL: this API may change without prior notice
16 * It is not recommended to include this file directly.
17 * Please include <rte_ring_elem.h> instead.
19 * Ring Peek Zero Copy APIs
20 * These APIs make it possible to split public enqueue/dequeue API
22 * - enqueue/dequeue start
23 * - copy data to/from the ring
24 * - enqueue/dequeue finish
25 * Along with the advantages of the peek APIs, these APIs provide the ability
26 * to avoid copying of the data to temporary area (for ex: array of mbufs
29 * Note that currently these APIs are available only for two sync modes:
30 * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
31 * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
32 * It is user's responsibility to create/init ring with appropriate sync
35 * Following are some examples showing the API usage.
37 * struct elem_obj {uint64_t a; uint32_t b, c;};
38 * struct elem_obj *obj;
40 * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
41 * // Reserve space on the ring
42 * n = rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(elem_obj), 1, &zcd, NULL);
44 * // Produce the data directly on the ring memory
45 * obj = (struct elem_obj *)zcd->ptr1;
46 * obj->a = rte_get_a();
47 * obj->b = rte_get_b();
48 * obj->c = rte_get_c();
49 * rte_ring_enqueue_zc_elem_finish(ring, n);
52 * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
53 * // Reserve space on the ring
54 * n = rte_ring_enqueue_zc_burst_start(r, 32, &zcd, NULL);
56 * // Pkt I/O core polls packets from the NIC
58 * nb_rx = rte_eth_rx_burst(portid, queueid, zcd->ptr1, zcd->n1);
59 * if (nb_rx == zcd->n1 && n != zcd->n1)
60 * nb_rx = rte_eth_rx_burst(portid, queueid,
61 * zcd->ptr2, n - zcd->n1);
63 * // Provide packets to the packet processing cores
64 * rte_ring_enqueue_zc_finish(r, nb_rx);
67 * Note that between _start_ and _finish_ none other thread can proceed
68 * with enqueue/dequeue operation till _finish_ completes.
75 #include <rte_ring_peek_elem_pvt.h>
78 * Ring zero-copy information structure.
80 * This structure contains the pointers and length of the space
81 * reserved on the ring storage.
83 struct rte_ring_zc_data {
84 /* Pointer to the first space in the ring */
86 /* Pointer to the second space in the ring if there is wrap-around.
87 * It contains valid value only if wrap-around happens.
90 /* Number of elements in the first pointer. If this is equal to
91 * the number of elements requested, then ptr2 is NULL.
92 * Otherwise, subtracting n1 from number of elements requested
93 * will give the number of elements available at ptr2.
96 } __rte_cache_aligned;
98 static __rte_always_inline void
99 __rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
100 uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
102 uint32_t idx, scale, nr_idx;
103 uint32_t *ring = (uint32_t *)&r[1];
105 /* Normalize to uint32_t */
106 scale = esize / sizeof(uint32_t);
107 idx = head & r->mask;
108 nr_idx = idx * scale;
110 *dst1 = ring + nr_idx;
113 if (idx + num > r->size) {
122 * @internal This function moves prod head value.
124 static __rte_always_inline unsigned int
125 __rte_ring_do_enqueue_zc_elem_start(struct rte_ring *r, unsigned int esize,
126 uint32_t n, enum rte_ring_queue_behavior behavior,
127 struct rte_ring_zc_data *zcd, unsigned int *free_space)
129 uint32_t free, head, next;
131 switch (r->prod.sync_type) {
132 case RTE_RING_SYNC_ST:
133 n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
134 behavior, &head, &next, &free);
136 case RTE_RING_SYNC_MT_HTS:
137 n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
139 case RTE_RING_SYNC_MT:
140 case RTE_RING_SYNC_MT_RTS:
142 /* unsupported mode, shouldn't be here */
149 __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
150 &zcd->n1, &zcd->ptr2);
152 if (free_space != NULL)
153 *free_space = free - n;
158 * Start to enqueue several objects on the ring.
159 * Note that no actual objects are put in the queue by this function,
160 * it just reserves space for the user on the ring.
161 * User has to copy objects into the queue using the returned pointers.
162 * User should call rte_ring_enqueue_zc_elem_finish to complete the
166 * A pointer to the ring structure.
168 * The size of ring element, in bytes. It must be a multiple of 4.
170 * The number of objects to add in the ring.
172 * Structure containing the pointers and length of the space
173 * reserved on the ring storage.
175 * If non-NULL, returns the amount of space in the ring after the
176 * reservation operation has finished.
178 * The number of objects that can be enqueued, either 0 or n
181 static __rte_always_inline unsigned int
182 rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
183 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
185 return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
186 RTE_RING_QUEUE_FIXED, zcd, free_space);
190 * Start to enqueue several pointers to objects on the ring.
191 * Note that no actual pointers are put in the queue by this function,
192 * it just reserves space for the user on the ring.
193 * User has to copy pointers to objects into the queue using the
195 * User should call rte_ring_enqueue_zc_finish to complete the
199 * A pointer to the ring structure.
201 * The number of objects to add in the ring.
203 * Structure containing the pointers and length of the space
204 * reserved on the ring storage.
206 * If non-NULL, returns the amount of space in the ring after the
207 * reservation operation has finished.
209 * The number of objects that can be enqueued, either 0 or n
212 static __rte_always_inline unsigned int
213 rte_ring_enqueue_zc_bulk_start(struct rte_ring *r, unsigned int n,
214 struct rte_ring_zc_data *zcd, unsigned int *free_space)
216 return rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(uintptr_t), n,
221 * Start to enqueue several objects on the ring.
222 * Note that no actual objects are put in the queue by this function,
223 * it just reserves space for the user on the ring.
224 * User has to copy objects into the queue using the returned pointers.
225 * User should call rte_ring_enqueue_zc_elem_finish to complete the
229 * A pointer to the ring structure.
231 * The size of ring element, in bytes. It must be a multiple of 4.
233 * The number of objects to add in the ring.
235 * Structure containing the pointers and length of the space
236 * reserved on the ring storage.
238 * If non-NULL, returns the amount of space in the ring after the
239 * reservation operation has finished.
241 * The number of objects that can be enqueued, either 0 or n
244 static __rte_always_inline unsigned int
245 rte_ring_enqueue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
246 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
248 return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
249 RTE_RING_QUEUE_VARIABLE, zcd, free_space);
253 * Start to enqueue several pointers to objects on the ring.
254 * Note that no actual pointers are put in the queue by this function,
255 * it just reserves space for the user on the ring.
256 * User has to copy pointers to objects into the queue using the
258 * User should call rte_ring_enqueue_zc_finish to complete the
262 * A pointer to the ring structure.
264 * The number of objects to add in the ring.
266 * Structure containing the pointers and length of the space
267 * reserved on the ring storage.
269 * If non-NULL, returns the amount of space in the ring after the
270 * reservation operation has finished.
272 * The number of objects that can be enqueued, either 0 or n.
275 static __rte_always_inline unsigned int
276 rte_ring_enqueue_zc_burst_start(struct rte_ring *r, unsigned int n,
277 struct rte_ring_zc_data *zcd, unsigned int *free_space)
279 return rte_ring_enqueue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
284 * Complete enqueuing several objects on the ring.
285 * Note that number of objects to enqueue should not exceed previous
286 * enqueue_start return value.
289 * A pointer to the ring structure.
291 * The number of objects to add to the ring.
294 static __rte_always_inline void
295 rte_ring_enqueue_zc_elem_finish(struct rte_ring *r, unsigned int n)
299 switch (r->prod.sync_type) {
300 case RTE_RING_SYNC_ST:
301 n = __rte_ring_st_get_tail(&r->prod, &tail, n);
302 __rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
304 case RTE_RING_SYNC_MT_HTS:
305 n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
306 __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
308 case RTE_RING_SYNC_MT:
309 case RTE_RING_SYNC_MT_RTS:
311 /* unsupported mode, shouldn't be here */
317 * Complete enqueuing several pointers to objects on the ring.
318 * Note that number of objects to enqueue should not exceed previous
319 * enqueue_start return value.
322 * A pointer to the ring structure.
324 * The number of pointers to objects to add to the ring.
327 static __rte_always_inline void
328 rte_ring_enqueue_zc_finish(struct rte_ring *r, unsigned int n)
330 rte_ring_enqueue_zc_elem_finish(r, n);
334 * @internal This function moves cons head value and copies up to *n*
335 * objects from the ring to the user provided obj_table.
337 static __rte_always_inline unsigned int
338 __rte_ring_do_dequeue_zc_elem_start(struct rte_ring *r,
339 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
340 struct rte_ring_zc_data *zcd, unsigned int *available)
342 uint32_t avail, head, next;
344 switch (r->cons.sync_type) {
345 case RTE_RING_SYNC_ST:
346 n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
347 behavior, &head, &next, &avail);
349 case RTE_RING_SYNC_MT_HTS:
350 n = __rte_ring_hts_move_cons_head(r, n, behavior,
353 case RTE_RING_SYNC_MT:
354 case RTE_RING_SYNC_MT_RTS:
356 /* unsupported mode, shouldn't be here */
363 __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
364 &zcd->n1, &zcd->ptr2);
366 if (available != NULL)
367 *available = avail - n;
372 * Start to dequeue several objects from the ring.
373 * Note that no actual objects are copied from the queue by this function.
374 * User has to copy objects from the queue using the returned pointers.
375 * User should call rte_ring_dequeue_zc_elem_finish to complete the
379 * A pointer to the ring structure.
381 * The size of ring element, in bytes. It must be a multiple of 4.
383 * The number of objects to remove from the ring.
385 * Structure containing the pointers and length of the space
386 * reserved on the ring storage.
388 * If non-NULL, returns the number of remaining ring entries after the
389 * dequeue has finished.
391 * The number of objects that can be dequeued, either 0 or n.
394 static __rte_always_inline unsigned int
395 rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
396 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
398 return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
399 RTE_RING_QUEUE_FIXED, zcd, available);
403 * Start to dequeue several pointers to objects from the ring.
404 * Note that no actual pointers are removed from the queue by this function.
405 * User has to copy pointers to objects from the queue using the
407 * User should call rte_ring_dequeue_zc_finish to complete the
411 * A pointer to the ring structure.
413 * The number of objects to remove from the ring.
415 * Structure containing the pointers and length of the space
416 * reserved on the ring storage.
418 * If non-NULL, returns the number of remaining ring entries after the
419 * dequeue has finished.
421 * The number of objects that can be dequeued, either 0 or n.
424 static __rte_always_inline unsigned int
425 rte_ring_dequeue_zc_bulk_start(struct rte_ring *r, unsigned int n,
426 struct rte_ring_zc_data *zcd, unsigned int *available)
428 return rte_ring_dequeue_zc_bulk_elem_start(r, sizeof(uintptr_t),
433 * Start to dequeue several objects from the ring.
434 * Note that no actual objects are copied from the queue by this function.
435 * User has to copy objects from the queue using the returned pointers.
436 * User should call rte_ring_dequeue_zc_elem_finish to complete the
440 * A pointer to the ring structure.
442 * The size of ring element, in bytes. It must be a multiple of 4.
443 * This must be the same value used while creating the ring. Otherwise
444 * the results are undefined.
446 * The number of objects to dequeue from the ring.
448 * Structure containing the pointers and length of the space
449 * reserved on the ring storage.
451 * If non-NULL, returns the number of remaining ring entries after the
452 * dequeue has finished.
454 * The number of objects that can be dequeued, either 0 or n.
457 static __rte_always_inline unsigned int
458 rte_ring_dequeue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
459 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
461 return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
462 RTE_RING_QUEUE_VARIABLE, zcd, available);
466 * Start to dequeue several pointers to objects from the ring.
467 * Note that no actual pointers are removed from the queue by this function.
468 * User has to copy pointers to objects from the queue using the
470 * User should call rte_ring_dequeue_zc_finish to complete the
474 * A pointer to the ring structure.
476 * The number of objects to remove from the ring.
478 * Structure containing the pointers and length of the space
479 * reserved on the ring storage.
481 * If non-NULL, returns the number of remaining ring entries after the
482 * dequeue has finished.
484 * The number of objects that can be dequeued, either 0 or n.
487 static __rte_always_inline unsigned int
488 rte_ring_dequeue_zc_burst_start(struct rte_ring *r, unsigned int n,
489 struct rte_ring_zc_data *zcd, unsigned int *available)
491 return rte_ring_dequeue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
496 * Complete dequeuing several objects from the ring.
497 * Note that number of objects to dequeued should not exceed previous
498 * dequeue_start return value.
501 * A pointer to the ring structure.
503 * The number of objects to remove from the ring.
506 static __rte_always_inline void
507 rte_ring_dequeue_zc_elem_finish(struct rte_ring *r, unsigned int n)
511 switch (r->cons.sync_type) {
512 case RTE_RING_SYNC_ST:
513 n = __rte_ring_st_get_tail(&r->cons, &tail, n);
514 __rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
516 case RTE_RING_SYNC_MT_HTS:
517 n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
518 __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
520 case RTE_RING_SYNC_MT:
521 case RTE_RING_SYNC_MT_RTS:
523 /* unsupported mode, shouldn't be here */
529 * Complete dequeuing several objects from the ring.
530 * Note that number of objects to dequeued should not exceed previous
531 * dequeue_start return value.
534 * A pointer to the ring structure.
536 * The number of objects to remove from the ring.
539 static __rte_always_inline void
540 rte_ring_dequeue_zc_finish(struct rte_ring *r, unsigned int n)
542 rte_ring_dequeue_elem_finish(r, n);
549 #endif /* _RTE_RING_PEEK_ZC_H_ */