test: fix ring PMD initialisation
[dpdk.git] / lib / ring / rte_ring_peek_zc.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2020 Arm Limited
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_PEEK_ZC_H_
11 #define _RTE_RING_PEEK_ZC_H_
12
13 /**
14  * @file
15  * It is not recommended to include this file directly.
16  * Please include <rte_ring_elem.h> instead.
17  *
18  * Ring Peek Zero Copy APIs
19  * These APIs make it possible to split public enqueue/dequeue API
20  * into 3 parts:
21  * - enqueue/dequeue start
22  * - copy data to/from the ring
23  * - enqueue/dequeue finish
24  * Along with the advantages of the peek APIs, these APIs provide the ability
25  * to avoid copying of the data to temporary area (for ex: array of mbufs
26  * on the stack).
27  *
28  * Note that currently these APIs are available only for two sync modes:
29  * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
30  * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
31  * It is user's responsibility to create/init ring with appropriate sync
32  * modes selected.
33  *
34  * Following are some examples showing the API usage.
35  * 1)
36  * struct elem_obj {uint64_t a; uint32_t b, c;};
37  * struct elem_obj *obj;
38  *
39  * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
40  * // Reserve space on the ring
41  * n = rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(elem_obj), 1, &zcd, NULL);
42  *
43  * // Produce the data directly on the ring memory
44  * obj = (struct elem_obj *)zcd->ptr1;
45  * obj->a = rte_get_a();
46  * obj->b = rte_get_b();
47  * obj->c = rte_get_c();
48  * rte_ring_enqueue_zc_elem_finish(ring, n);
49  *
50  * 2)
51  * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
52  * // Reserve space on the ring
53  * n = rte_ring_enqueue_zc_burst_start(r, 32, &zcd, NULL);
54  *
55  * // Pkt I/O core polls packets from the NIC
56  * if (n != 0) {
57  *      nb_rx = rte_eth_rx_burst(portid, queueid, zcd->ptr1, zcd->n1);
58  *      if (nb_rx == zcd->n1 && n != zcd->n1)
59  *              nb_rx = rte_eth_rx_burst(portid, queueid,
60  *                                              zcd->ptr2, n - zcd->n1);
61  *
62  *      // Provide packets to the packet processing cores
63  *      rte_ring_enqueue_zc_finish(r, nb_rx);
64  * }
65  *
66  * Note that between _start_ and _finish_ none other thread can proceed
67  * with enqueue/dequeue operation till _finish_ completes.
68  */
69
70 #ifdef __cplusplus
71 extern "C" {
72 #endif
73
74 #include <rte_ring_peek_elem_pvt.h>
75
76 /**
77  * Ring zero-copy information structure.
78  *
79  * This structure contains the pointers and length of the space
80  * reserved on the ring storage.
81  */
82 struct rte_ring_zc_data {
83         /* Pointer to the first space in the ring */
84         void *ptr1;
85         /* Pointer to the second space in the ring if there is wrap-around.
86          * It contains valid value only if wrap-around happens.
87          */
88         void *ptr2;
89         /* Number of elements in the first pointer. If this is equal to
90          * the number of elements requested, then ptr2 is NULL.
91          * Otherwise, subtracting n1 from number of elements requested
92          * will give the number of elements available at ptr2.
93          */
94         unsigned int n1;
95 } __rte_cache_aligned;
96
97 static __rte_always_inline void
98 __rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
99         uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
100 {
101         uint32_t idx, scale, nr_idx;
102         uint32_t *ring = (uint32_t *)&r[1];
103
104         /* Normalize to uint32_t */
105         scale = esize / sizeof(uint32_t);
106         idx = head & r->mask;
107         nr_idx = idx * scale;
108
109         *dst1 = ring + nr_idx;
110         *n1 = num;
111
112         if (idx + num > r->size) {
113                 *n1 = r->size - idx;
114                 *dst2 = ring;
115         } else {
116                 *dst2 = NULL;
117         }
118 }
119
120 /**
121  * @internal This function moves prod head value.
122  */
123 static __rte_always_inline unsigned int
124 __rte_ring_do_enqueue_zc_elem_start(struct rte_ring *r, unsigned int esize,
125                 uint32_t n, enum rte_ring_queue_behavior behavior,
126                 struct rte_ring_zc_data *zcd, unsigned int *free_space)
127 {
128         uint32_t free, head, next;
129
130         switch (r->prod.sync_type) {
131         case RTE_RING_SYNC_ST:
132                 n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
133                         behavior, &head, &next, &free);
134                 break;
135         case RTE_RING_SYNC_MT_HTS:
136                 n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
137                 break;
138         case RTE_RING_SYNC_MT:
139         case RTE_RING_SYNC_MT_RTS:
140         default:
141                 /* unsupported mode, shouldn't be here */
142                 RTE_ASSERT(0);
143                 n = 0;
144                 free = 0;
145                 return n;
146         }
147
148         __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
149                 &zcd->n1, &zcd->ptr2);
150
151         if (free_space != NULL)
152                 *free_space = free - n;
153         return n;
154 }
155
156 /**
157  * Start to enqueue several objects on the ring.
158  * Note that no actual objects are put in the queue by this function,
159  * it just reserves space for the user on the ring.
160  * User has to copy objects into the queue using the returned pointers.
161  * User should call rte_ring_enqueue_zc_elem_finish to complete the
162  * enqueue operation.
163  *
164  * @param r
165  *   A pointer to the ring structure.
166  * @param esize
167  *   The size of ring element, in bytes. It must be a multiple of 4.
168  * @param n
169  *   The number of objects to add in the ring.
170  * @param zcd
171  *   Structure containing the pointers and length of the space
172  *   reserved on the ring storage.
173  * @param free_space
174  *   If non-NULL, returns the amount of space in the ring after the
175  *   reservation operation has finished.
176  * @return
177  *   The number of objects that can be enqueued, either 0 or n
178  */
179 static __rte_always_inline unsigned int
180 rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
181         unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
182 {
183         return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
184                         RTE_RING_QUEUE_FIXED, zcd, free_space);
185 }
186
187 /**
188  * Start to enqueue several pointers to objects on the ring.
189  * Note that no actual pointers are put in the queue by this function,
190  * it just reserves space for the user on the ring.
191  * User has to copy pointers to objects into the queue using the
192  * returned pointers.
193  * User should call rte_ring_enqueue_zc_finish to complete the
194  * enqueue operation.
195  *
196  * @param r
197  *   A pointer to the ring structure.
198  * @param n
199  *   The number of objects to add in the ring.
200  * @param zcd
201  *   Structure containing the pointers and length of the space
202  *   reserved on the ring storage.
203  * @param free_space
204  *   If non-NULL, returns the amount of space in the ring after the
205  *   reservation operation has finished.
206  * @return
207  *   The number of objects that can be enqueued, either 0 or n
208  */
209 static __rte_always_inline unsigned int
210 rte_ring_enqueue_zc_bulk_start(struct rte_ring *r, unsigned int n,
211         struct rte_ring_zc_data *zcd, unsigned int *free_space)
212 {
213         return rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(uintptr_t), n,
214                                                         zcd, free_space);
215 }
216
217 /**
218  * Start to enqueue several objects on the ring.
219  * Note that no actual objects are put in the queue by this function,
220  * it just reserves space for the user on the ring.
221  * User has to copy objects into the queue using the returned pointers.
222  * User should call rte_ring_enqueue_zc_elem_finish to complete the
223  * enqueue operation.
224  *
225  * @param r
226  *   A pointer to the ring structure.
227  * @param esize
228  *   The size of ring element, in bytes. It must be a multiple of 4.
229  * @param n
230  *   The number of objects to add in the ring.
231  * @param zcd
232  *   Structure containing the pointers and length of the space
233  *   reserved on the ring storage.
234  * @param free_space
235  *   If non-NULL, returns the amount of space in the ring after the
236  *   reservation operation has finished.
237  * @return
238  *   The number of objects that can be enqueued, either 0 or n
239  */
240 static __rte_always_inline unsigned int
241 rte_ring_enqueue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
242         unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
243 {
244         return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
245                         RTE_RING_QUEUE_VARIABLE, zcd, free_space);
246 }
247
248 /**
249  * Start to enqueue several pointers to objects on the ring.
250  * Note that no actual pointers are put in the queue by this function,
251  * it just reserves space for the user on the ring.
252  * User has to copy pointers to objects into the queue using the
253  * returned pointers.
254  * User should call rte_ring_enqueue_zc_finish to complete the
255  * enqueue operation.
256  *
257  * @param r
258  *   A pointer to the ring structure.
259  * @param n
260  *   The number of objects to add in the ring.
261  * @param zcd
262  *   Structure containing the pointers and length of the space
263  *   reserved on the ring storage.
264  * @param free_space
265  *   If non-NULL, returns the amount of space in the ring after the
266  *   reservation operation has finished.
267  * @return
268  *   The number of objects that can be enqueued, either 0 or n.
269  */
270 static __rte_always_inline unsigned int
271 rte_ring_enqueue_zc_burst_start(struct rte_ring *r, unsigned int n,
272         struct rte_ring_zc_data *zcd, unsigned int *free_space)
273 {
274         return rte_ring_enqueue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
275                                                         zcd, free_space);
276 }
277
278 /**
279  * Complete enqueuing several objects on the ring.
280  * Note that number of objects to enqueue should not exceed previous
281  * enqueue_start return value.
282  *
283  * @param r
284  *   A pointer to the ring structure.
285  * @param n
286  *   The number of objects to add to the ring.
287  */
288 static __rte_always_inline void
289 rte_ring_enqueue_zc_elem_finish(struct rte_ring *r, unsigned int n)
290 {
291         uint32_t tail;
292
293         switch (r->prod.sync_type) {
294         case RTE_RING_SYNC_ST:
295                 n = __rte_ring_st_get_tail(&r->prod, &tail, n);
296                 __rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
297                 break;
298         case RTE_RING_SYNC_MT_HTS:
299                 n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
300                 __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
301                 break;
302         case RTE_RING_SYNC_MT:
303         case RTE_RING_SYNC_MT_RTS:
304         default:
305                 /* unsupported mode, shouldn't be here */
306                 RTE_ASSERT(0);
307         }
308 }
309
310 /**
311  * Complete enqueuing several pointers to objects on the ring.
312  * Note that number of objects to enqueue should not exceed previous
313  * enqueue_start return value.
314  *
315  * @param r
316  *   A pointer to the ring structure.
317  * @param n
318  *   The number of pointers to objects to add to the ring.
319  */
320 static __rte_always_inline void
321 rte_ring_enqueue_zc_finish(struct rte_ring *r, unsigned int n)
322 {
323         rte_ring_enqueue_zc_elem_finish(r, n);
324 }
325
326 /**
327  * @internal This function moves cons head value and copies up to *n*
328  * objects from the ring to the user provided obj_table.
329  */
330 static __rte_always_inline unsigned int
331 __rte_ring_do_dequeue_zc_elem_start(struct rte_ring *r,
332         uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
333         struct rte_ring_zc_data *zcd, unsigned int *available)
334 {
335         uint32_t avail, head, next;
336
337         switch (r->cons.sync_type) {
338         case RTE_RING_SYNC_ST:
339                 n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
340                         behavior, &head, &next, &avail);
341                 break;
342         case RTE_RING_SYNC_MT_HTS:
343                 n = __rte_ring_hts_move_cons_head(r, n, behavior,
344                         &head, &avail);
345                 break;
346         case RTE_RING_SYNC_MT:
347         case RTE_RING_SYNC_MT_RTS:
348         default:
349                 /* unsupported mode, shouldn't be here */
350                 RTE_ASSERT(0);
351                 n = 0;
352                 avail = 0;
353                 return n;
354         }
355
356         __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
357                 &zcd->n1, &zcd->ptr2);
358
359         if (available != NULL)
360                 *available = avail - n;
361         return n;
362 }
363
364 /**
365  * Start to dequeue several objects from the ring.
366  * Note that no actual objects are copied from the queue by this function.
367  * User has to copy objects from the queue using the returned pointers.
368  * User should call rte_ring_dequeue_zc_elem_finish to complete the
369  * dequeue operation.
370  *
371  * @param r
372  *   A pointer to the ring structure.
373  * @param esize
374  *   The size of ring element, in bytes. It must be a multiple of 4.
375  * @param n
376  *   The number of objects to remove from the ring.
377  * @param zcd
378  *   Structure containing the pointers and length of the space
379  *   reserved on the ring storage.
380  * @param available
381  *   If non-NULL, returns the number of remaining ring entries after the
382  *   dequeue has finished.
383  * @return
384  *   The number of objects that can be dequeued, either 0 or n.
385  */
386 static __rte_always_inline unsigned int
387 rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
388         unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
389 {
390         return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
391                         RTE_RING_QUEUE_FIXED, zcd, available);
392 }
393
394 /**
395  * Start to dequeue several pointers to objects from the ring.
396  * Note that no actual pointers are removed from the queue by this function.
397  * User has to copy pointers to objects from the queue using the
398  * returned pointers.
399  * User should call rte_ring_dequeue_zc_finish to complete the
400  * dequeue operation.
401  *
402  * @param r
403  *   A pointer to the ring structure.
404  * @param n
405  *   The number of objects to remove from the ring.
406  * @param zcd
407  *   Structure containing the pointers and length of the space
408  *   reserved on the ring storage.
409  * @param available
410  *   If non-NULL, returns the number of remaining ring entries after the
411  *   dequeue has finished.
412  * @return
413  *   The number of objects that can be dequeued, either 0 or n.
414  */
415 static __rte_always_inline unsigned int
416 rte_ring_dequeue_zc_bulk_start(struct rte_ring *r, unsigned int n,
417         struct rte_ring_zc_data *zcd, unsigned int *available)
418 {
419         return rte_ring_dequeue_zc_bulk_elem_start(r, sizeof(uintptr_t),
420                 n, zcd, available);
421 }
422
423 /**
424  * Start to dequeue several objects from the ring.
425  * Note that no actual objects are copied from the queue by this function.
426  * User has to copy objects from the queue using the returned pointers.
427  * User should call rte_ring_dequeue_zc_elem_finish to complete the
428  * dequeue operation.
429  *
430  * @param r
431  *   A pointer to the ring structure.
432  * @param esize
433  *   The size of ring element, in bytes. It must be a multiple of 4.
434  *   This must be the same value used while creating the ring. Otherwise
435  *   the results are undefined.
436  * @param n
437  *   The number of objects to dequeue from the ring.
438  * @param zcd
439  *   Structure containing the pointers and length of the space
440  *   reserved on the ring storage.
441  * @param available
442  *   If non-NULL, returns the number of remaining ring entries after the
443  *   dequeue has finished.
444  * @return
445  *   The number of objects that can be dequeued, either 0 or n.
446  */
447 static __rte_always_inline unsigned int
448 rte_ring_dequeue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
449         unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
450 {
451         return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
452                         RTE_RING_QUEUE_VARIABLE, zcd, available);
453 }
454
455 /**
456  * Start to dequeue several pointers to objects from the ring.
457  * Note that no actual pointers are removed from the queue by this function.
458  * User has to copy pointers to objects from the queue using the
459  * returned pointers.
460  * User should call rte_ring_dequeue_zc_finish to complete the
461  * dequeue operation.
462  *
463  * @param r
464  *   A pointer to the ring structure.
465  * @param n
466  *   The number of objects to remove from the ring.
467  * @param zcd
468  *   Structure containing the pointers and length of the space
469  *   reserved on the ring storage.
470  * @param available
471  *   If non-NULL, returns the number of remaining ring entries after the
472  *   dequeue has finished.
473  * @return
474  *   The number of objects that can be dequeued, either 0 or n.
475  */
476 static __rte_always_inline unsigned int
477 rte_ring_dequeue_zc_burst_start(struct rte_ring *r, unsigned int n,
478                 struct rte_ring_zc_data *zcd, unsigned int *available)
479 {
480         return rte_ring_dequeue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
481                         zcd, available);
482 }
483
484 /**
485  * Complete dequeuing several objects from the ring.
486  * Note that number of objects to dequeued should not exceed previous
487  * dequeue_start return value.
488  *
489  * @param r
490  *   A pointer to the ring structure.
491  * @param n
492  *   The number of objects to remove from the ring.
493  */
494 static __rte_always_inline void
495 rte_ring_dequeue_zc_elem_finish(struct rte_ring *r, unsigned int n)
496 {
497         uint32_t tail;
498
499         switch (r->cons.sync_type) {
500         case RTE_RING_SYNC_ST:
501                 n = __rte_ring_st_get_tail(&r->cons, &tail, n);
502                 __rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
503                 break;
504         case RTE_RING_SYNC_MT_HTS:
505                 n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
506                 __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
507                 break;
508         case RTE_RING_SYNC_MT:
509         case RTE_RING_SYNC_MT_RTS:
510         default:
511                 /* unsupported mode, shouldn't be here */
512                 RTE_ASSERT(0);
513         }
514 }
515
516 /**
517  * Complete dequeuing several objects from the ring.
518  * Note that number of objects to dequeued should not exceed previous
519  * dequeue_start return value.
520  *
521  * @param r
522  *   A pointer to the ring structure.
523  * @param n
524  *   The number of objects to remove from the ring.
525  */
526 static __rte_always_inline void
527 rte_ring_dequeue_zc_finish(struct rte_ring *r, unsigned int n)
528 {
529         rte_ring_dequeue_elem_finish(r, n);
530 }
531
532 #ifdef __cplusplus
533 }
534 #endif
535
536 #endif /* _RTE_RING_PEEK_ZC_H_ */