event/opdl: fix dereference before null check
[dpdk.git] / drivers / event / opdl / opdl_ring.c
1 /*-
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright(c) 2010-2014 Intel Corporation
4  */
5
6 #include <stdbool.h>
7 #include <stddef.h>
8 #include <stdint.h>
9 #include <stdio.h>
10
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
14 #include <rte_log.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_eal_memconfig.h>
20
21 #include "opdl_ring.h"
22 #include "opdl_log.h"
23
24 #define LIB_NAME "opdl_ring"
25
26 #define OPDL_NAME_SIZE 64
27
28
29 #define OPDL_EVENT_MASK  (0xFFFF0000000FFFFFULL)
30
31 int opdl_logtype_driver;
32
33 /* Types of dependency between stages */
34 enum dep_type {
35         DEP_NONE = 0,  /* no dependency */
36         DEP_DIRECT,  /* stage has direct dependency */
37         DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
38         DEP_SELF,  /* stage dependency on itself, used to detect loops */
39 };
40
41 /* Shared section of stage state.
42  * Care is needed when accessing and the layout is important, especially to
43  * limit the adjacent cache-line HW prefetcher from impacting performance.
44  */
45 struct shared_state {
46         /* Last known minimum sequence number of dependencies, used for multi
47          * thread operation
48          */
49         uint32_t available_seq;
50         char _pad1[RTE_CACHE_LINE_SIZE * 3];
51         uint32_t head;  /* Head sequence number (for multi thread operation) */
52         char _pad2[RTE_CACHE_LINE_SIZE * 3];
53         struct opdl_stage *stage;  /* back pointer */
54         uint32_t tail;  /* Tail sequence number */
55         char _pad3[RTE_CACHE_LINE_SIZE * 2];
56 } __rte_cache_aligned;
57
58 /* A structure to keep track of "unfinished" claims. This is only used for
59  * stages that are threadsafe. Each lcore accesses its own instance of this
60  * structure to record the entries it has claimed. This allows one lcore to make
61  * multiple claims without being blocked by another. When disclaiming it moves
62  * forward the shared tail when the shared tail matches the tail value recorded
63  * here.
64  */
65 struct claim_manager {
66         uint32_t num_to_disclaim;
67         uint32_t num_claimed;
68         uint32_t mgr_head;
69         uint32_t mgr_tail;
70         struct {
71                 uint32_t head;
72                 uint32_t tail;
73         } claims[OPDL_DISCLAIMS_PER_LCORE];
74 } __rte_cache_aligned;
75
76 /* Context for each stage of opdl_ring.
77  * Calculations on sequence numbers need to be done with other uint32_t values
78  * so that results are modulus 2^32, and not undefined.
79  */
80 struct opdl_stage {
81         struct opdl_ring *t;  /* back pointer, set at init */
82         uint32_t num_slots;  /* Number of slots for entries, set at init */
83         uint32_t index;  /* ID for this stage, set at init */
84         bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
85         /* Last known min seq number of dependencies for used for single thread
86          * operation
87          */
88         uint32_t available_seq;
89         uint32_t head;  /* Current head for single-thread operation */
90         uint32_t shadow_head;  /* Shadow head for single-thread operation */
91         uint32_t nb_instance;  /* Number of instances */
92         uint32_t instance_id;  /* ID of this stage instance */
93         uint16_t num_claimed;  /* Number of slots claimed */
94         uint16_t num_event;             /* Number of events */
95         uint32_t seq;                   /* sequence number  */
96         uint32_t num_deps;  /* Number of direct dependencies */
97         /* Keep track of all dependencies, used during init only */
98         enum dep_type *dep_tracking;
99         /* Direct dependencies of this stage */
100         struct shared_state **deps;
101         /* Other stages read this! */
102         struct shared_state shared __rte_cache_aligned;
103         /* For managing disclaims in multi-threaded processing stages */
104         struct claim_manager pending_disclaims[RTE_MAX_LCORE]
105                                                __rte_cache_aligned;
106 } __rte_cache_aligned;
107
108 /* Context for opdl_ring */
109 struct opdl_ring {
110         char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
111         int socket;  /* NUMA socket that memory is allocated on */
112         uint32_t num_slots;  /* Number of slots for entries */
113         uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
114         uint32_t slot_size;  /* Size of each slot in bytes */
115         uint32_t num_stages;  /* Number of stages that have been added */
116         uint32_t max_num_stages;  /* Max number of stages */
117         /* Stages indexed by ID */
118         struct opdl_stage *stages;
119         /* Memory for storing slot data */
120         uint8_t slots[0] __rte_cache_aligned;
121 };
122
123
124 /* Return input stage of a opdl_ring */
125 static __rte_always_inline struct opdl_stage *
126 input_stage(const struct opdl_ring *t)
127 {
128         return &t->stages[0];
129 }
130
131 /* Check if a stage is the input stage */
132 static __rte_always_inline bool
133 is_input_stage(const struct opdl_stage *s)
134 {
135         return s->index == 0;
136 }
137
138 /* Get slot pointer from sequence number */
139 static __rte_always_inline void *
140 get_slot(const struct opdl_ring *t, uint32_t n)
141 {
142         return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
143 }
144
145 /* Find how many entries are available for processing */
146 static __rte_always_inline uint32_t
147 available(const struct opdl_stage *s)
148 {
149         if (s->threadsafe == true) {
150                 uint32_t n = __atomic_load_n(&s->shared.available_seq,
151                                 __ATOMIC_ACQUIRE) -
152                                 __atomic_load_n(&s->shared.head,
153                                 __ATOMIC_ACQUIRE);
154
155                 /* Return 0 if available_seq needs to be updated */
156                 return (n <= s->num_slots) ? n : 0;
157         }
158
159         /* Single threaded */
160         return s->available_seq - s->head;
161 }
162
163 /* Read sequence number of dependencies and find minimum */
164 static __rte_always_inline void
165 update_available_seq(struct opdl_stage *s)
166 {
167         uint32_t i;
168         uint32_t this_tail = s->shared.tail;
169         uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
170         /* Input stage sequence numbers are greater than the sequence numbers of
171          * its dependencies so an offset of t->num_slots is needed when
172          * calculating available slots and also the condition which is used to
173          * determine the dependencies minimum sequence number must be reverted.
174          */
175         uint32_t wrap;
176
177         if (is_input_stage(s)) {
178                 wrap = s->num_slots;
179                 for (i = 1; i < s->num_deps; i++) {
180                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
181                                         __ATOMIC_ACQUIRE);
182                         if ((this_tail - seq) > (this_tail - min_seq))
183                                 min_seq = seq;
184                 }
185         } else {
186                 wrap = 0;
187                 for (i = 1; i < s->num_deps; i++) {
188                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
189                                         __ATOMIC_ACQUIRE);
190                         if ((seq - this_tail) < (min_seq - this_tail))
191                                 min_seq = seq;
192                 }
193         }
194
195         if (s->threadsafe == false)
196                 s->available_seq = min_seq + wrap;
197         else
198                 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
199                                 __ATOMIC_RELEASE);
200 }
201
202 /* Wait until the number of available slots reaches number requested */
203 static __rte_always_inline void
204 wait_for_available(struct opdl_stage *s, uint32_t n)
205 {
206         while (available(s) < n) {
207                 rte_pause();
208                 update_available_seq(s);
209         }
210 }
211
212 /* Return number of slots to process based on number requested and mode */
213 static __rte_always_inline uint32_t
214 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
215 {
216         /* Don't read tail sequences of dependencies if not needed */
217         if (available(s) >= n)
218                 return n;
219
220         update_available_seq(s);
221
222         if (block == false) {
223                 uint32_t avail = available(s);
224
225                 if (avail == 0) {
226                         rte_pause();
227                         return 0;
228                 }
229                 return (avail <= n) ? avail : n;
230         }
231
232         if (unlikely(n > s->num_slots)) {
233                 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
234                                 n, s->num_slots);
235                 return 0;  /* Avoid infinite loop */
236         }
237         /* blocking */
238         wait_for_available(s, n);
239         return n;
240 }
241
242 /* Copy entries in to slots with wrap-around */
243 static __rte_always_inline void
244 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
245                 uint32_t num_entries)
246 {
247         uint32_t slot_size = t->slot_size;
248         uint32_t slot_index = start & t->mask;
249
250         if (slot_index + num_entries <= t->num_slots) {
251                 rte_memcpy(get_slot(t, start), entries,
252                                 num_entries * slot_size);
253         } else {
254                 uint32_t split = t->num_slots - slot_index;
255
256                 rte_memcpy(get_slot(t, start), entries, split * slot_size);
257                 rte_memcpy(get_slot(t, 0),
258                                 RTE_PTR_ADD(entries, split * slot_size),
259                                 (num_entries - split) * slot_size);
260         }
261 }
262
263 /* Copy entries out from slots with wrap-around */
264 static __rte_always_inline void
265 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
266                 uint32_t num_entries)
267 {
268         uint32_t slot_size = t->slot_size;
269         uint32_t slot_index = start & t->mask;
270
271         if (slot_index + num_entries <= t->num_slots) {
272                 rte_memcpy(entries, get_slot(t, start),
273                                 num_entries * slot_size);
274         } else {
275                 uint32_t split = t->num_slots - slot_index;
276
277                 rte_memcpy(entries, get_slot(t, start), split * slot_size);
278                 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
279                                 get_slot(t, 0),
280                                 (num_entries - split) * slot_size);
281         }
282 }
283
284 /* Input function optimised for single thread */
285 static __rte_always_inline uint32_t
286 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
287                 uint32_t num_entries, bool block)
288 {
289         struct opdl_stage *s = input_stage(t);
290         uint32_t head = s->head;
291
292         num_entries = num_to_process(s, num_entries, block);
293         if (num_entries == 0)
294                 return 0;
295
296         copy_entries_in(t, head, entries, num_entries);
297
298         s->head += num_entries;
299         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
300
301         return num_entries;
302 }
303
304 /* Convert head and tail of claim_manager into valid index */
305 static __rte_always_inline uint32_t
306 claim_mgr_index(uint32_t n)
307 {
308         return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
309 }
310
311 /* Check if there are available slots in claim_manager */
312 static __rte_always_inline bool
313 claim_mgr_available(struct claim_manager *mgr)
314 {
315         return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
316                         true : false;
317 }
318
319 /* Record a new claim. Only use after first checking an entry is available */
320 static __rte_always_inline void
321 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
322 {
323         if ((mgr->mgr_head != mgr->mgr_tail) &&
324                         (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
325                         tail)) {
326                 /* Combine with previous claim */
327                 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
328         } else {
329                 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
330                 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
331                 mgr->mgr_head++;
332         }
333
334         mgr->num_claimed += (head - tail);
335 }
336
337 /* Read the oldest recorded claim */
338 static __rte_always_inline bool
339 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
340 {
341         if (mgr->mgr_head == mgr->mgr_tail)
342                 return false;
343
344         *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
345         *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
346         return true;
347 }
348
349 /* Remove the oldest recorded claim. Only use after first reading the entry */
350 static __rte_always_inline void
351 claim_mgr_remove(struct claim_manager *mgr)
352 {
353         mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
354                         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
355         mgr->mgr_tail++;
356 }
357
358 /* Update tail in the oldest claim. Only use after first reading the entry */
359 static __rte_always_inline void
360 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
361 {
362         mgr->num_claimed -= num_entries;
363         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
364 }
365
366 static __rte_always_inline void
367 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
368                 uint32_t num_entries, bool block)
369 {
370         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
371         uint32_t head;
372         uint32_t tail;
373
374         while (num_entries) {
375                 bool ret = claim_mgr_read(disclaims, &tail, &head);
376
377                 if (ret == false)
378                         break;  /* nothing is claimed */
379                 /* There should be no race condition here. If shared.tail
380                  * matches, no other core can update it until this one does.
381                  */
382                 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
383                                 tail) {
384                         if (num_entries >= (head - tail)) {
385                                 claim_mgr_remove(disclaims);
386                                 __atomic_store_n(&s->shared.tail, head,
387                                                 __ATOMIC_RELEASE);
388                                 num_entries -= (head - tail);
389                         } else {
390                                 claim_mgr_move_tail(disclaims, num_entries);
391                                 __atomic_store_n(&s->shared.tail,
392                                                 num_entries + tail,
393                                                 __ATOMIC_RELEASE);
394                                 num_entries = 0;
395                         }
396                 } else if (block == false)
397                         break;  /* blocked by other thread */
398                 /* Keep going until num_entries are disclaimed. */
399                 rte_pause();
400         }
401
402         disclaims->num_to_disclaim = num_entries;
403 }
404
405 /* Move head atomically, returning number of entries available to process and
406  * the original value of head. For non-input stages, the claim is recorded
407  * so that the tail can be updated later by opdl_stage_disclaim().
408  */
409 static __rte_always_inline void
410 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
411                 uint32_t *old_head, bool block, bool claim_func)
412 {
413         uint32_t orig_num_entries = *num_entries;
414         uint32_t ret;
415         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
416
417         /* Attempt to disclaim any outstanding claims */
418         opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
419                         false);
420
421         *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
422         while (true) {
423                 bool success;
424                 /* If called by opdl_ring_input(), claim does not need to be
425                  * recorded, as there will be no disclaim.
426                  */
427                 if (claim_func) {
428                         /* Check that the claim can be recorded */
429                         ret = claim_mgr_available(disclaims);
430                         if (ret == false) {
431                                 /* exit out if claim can't be recorded */
432                                 *num_entries = 0;
433                                 return;
434                         }
435                 }
436
437                 *num_entries = num_to_process(s, orig_num_entries, block);
438                 if (*num_entries == 0)
439                         return;
440
441                 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
442                                 *old_head + *num_entries,
443                                 true,  /* may fail spuriously */
444                                 __ATOMIC_RELEASE,  /* memory order on success */
445                                 __ATOMIC_ACQUIRE);  /* memory order on fail */
446                 if (likely(success))
447                         break;
448                 rte_pause();
449         }
450
451         if (claim_func)
452                 /* Store the claim record */
453                 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
454 }
455
456 /* Input function that supports multiple threads */
457 static __rte_always_inline uint32_t
458 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
459                 uint32_t num_entries, bool block)
460 {
461         struct opdl_stage *s = input_stage(t);
462         uint32_t old_head;
463
464         move_head_atomically(s, &num_entries, &old_head, block, false);
465         if (num_entries == 0)
466                 return 0;
467
468         copy_entries_in(t, old_head, entries, num_entries);
469
470         /* If another thread started inputting before this one, but hasn't
471          * finished, we need to wait for it to complete to update the tail.
472          */
473         while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
474                         old_head))
475                 rte_pause();
476
477         __atomic_store_n(&s->shared.tail, old_head + num_entries,
478                         __ATOMIC_RELEASE);
479
480         return num_entries;
481 }
482
483 static __rte_always_inline uint32_t
484 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
485                 uint8_t this_lcore)
486 {
487         return ((nb_p_lcores <= 1) ? 0 :
488                         (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
489                         nb_p_lcores);
490 }
491
492 /* Claim slots to process, optimised for single-thread operation */
493 static __rte_always_inline uint32_t
494 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
495                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
496 {
497         uint32_t i = 0, j = 0,  offset;
498         void *get_slots;
499         struct rte_event *ev;
500         RTE_SET_USED(seq);
501         struct opdl_ring *t = s->t;
502         uint8_t *entries_offset = (uint8_t *)entries;
503
504         if (!atomic) {
505
506                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
507                                 s->instance_id);
508
509                 num_entries = s->nb_instance * num_entries;
510
511                 num_entries = num_to_process(s, num_entries, block);
512
513                 for (; offset < num_entries; offset += s->nb_instance) {
514                         get_slots = get_slot(t, s->head + offset);
515                         memcpy(entries_offset, get_slots, t->slot_size);
516                         entries_offset += t->slot_size;
517                         i++;
518                 }
519         } else {
520                 num_entries = num_to_process(s, num_entries, block);
521
522                 for (j = 0; j < num_entries; j++) {
523                         ev = (struct rte_event *)get_slot(t, s->head+j);
524                         if ((ev->flow_id%s->nb_instance) == s->instance_id) {
525                                 memcpy(entries_offset, ev, t->slot_size);
526                                 entries_offset += t->slot_size;
527                                 i++;
528                         }
529                 }
530         }
531         s->shadow_head = s->head;
532         s->head += num_entries;
533         s->num_claimed = num_entries;
534         s->num_event = i;
535
536         /* automatically disclaim entries if number of rte_events is zero */
537         if (unlikely(i == 0))
538                 opdl_stage_disclaim(s, 0, false);
539
540         return i;
541 }
542
543 /* Thread-safe version of function to claim slots for processing */
544 static __rte_always_inline uint32_t
545 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
546                 uint32_t num_entries, uint32_t *seq, bool block)
547 {
548         uint32_t old_head;
549         struct opdl_ring *t = s->t;
550         uint32_t i = 0, offset;
551         uint8_t *entries_offset = (uint8_t *)entries;
552
553         if (seq == NULL) {
554                 PMD_DRV_LOG(ERR, "Invalid seq PTR");
555                 return 0;
556         }
557         offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
558         num_entries = offset + (s->nb_instance * num_entries);
559
560         move_head_atomically(s, &num_entries, &old_head, block, true);
561
562         for (; offset < num_entries; offset += s->nb_instance) {
563                 memcpy(entries_offset, get_slot(t, s->head + offset),
564                         t->slot_size);
565                 entries_offset += t->slot_size;
566                 i++;
567         }
568
569         *seq = old_head;
570
571         return i;
572 }
573
574 /* Claim and copy slot pointers, optimised for single-thread operation */
575 static __rte_always_inline uint32_t
576 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
577                 uint32_t num_entries, uint32_t *seq, bool block)
578 {
579         num_entries = num_to_process(s, num_entries, block);
580         if (num_entries == 0)
581                 return 0;
582         copy_entries_out(s->t, s->head, entries, num_entries);
583         if (seq != NULL)
584                 *seq = s->head;
585         s->head += num_entries;
586         return num_entries;
587 }
588
589 /* Thread-safe version of function to claim and copy pointers to slots */
590 static __rte_always_inline uint32_t
591 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
592                 uint32_t num_entries, uint32_t *seq, bool block)
593 {
594         uint32_t old_head;
595
596         move_head_atomically(s, &num_entries, &old_head, block, true);
597         if (num_entries == 0)
598                 return 0;
599         copy_entries_out(s->t, old_head, entries, num_entries);
600         if (seq != NULL)
601                 *seq = old_head;
602         return num_entries;
603 }
604
605 static __rte_always_inline void
606 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
607                 uint32_t num_entries)
608 {
609         uint32_t old_tail = s->shared.tail;
610
611         if (unlikely(num_entries > (s->head - old_tail))) {
612                 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
613                                 num_entries, s->head - old_tail);
614                 num_entries = s->head - old_tail;
615         }
616         __atomic_store_n(&s->shared.tail, num_entries + old_tail,
617                         __ATOMIC_RELEASE);
618 }
619
620 uint32_t
621 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
622                 bool block)
623 {
624         if (input_stage(t)->threadsafe == false)
625                 return opdl_ring_input_singlethread(t, entries, num_entries,
626                                 block);
627         else
628                 return opdl_ring_input_multithread(t, entries, num_entries,
629                                 block);
630 }
631
632 uint32_t
633 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
634                 const void *entries, uint32_t num_entries, bool block)
635 {
636         uint32_t head = s->head;
637
638         num_entries = num_to_process(s, num_entries, block);
639
640         if (num_entries == 0)
641                 return 0;
642
643         copy_entries_in(t, head, entries, num_entries);
644
645         s->head += num_entries;
646         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
647
648         return num_entries;
649
650 }
651
652 uint32_t
653 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
654                 void *entries, uint32_t num_entries, bool block)
655 {
656         uint32_t head = s->head;
657
658         num_entries = num_to_process(s, num_entries, block);
659         if (num_entries == 0)
660                 return 0;
661
662         copy_entries_out(t, head, entries, num_entries);
663
664         s->head += num_entries;
665         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
666
667         return num_entries;
668 }
669
670 uint32_t
671 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
672 {
673         /* return (num_to_process(s, num_entries, false)); */
674
675         if (available(s) >= num_entries)
676                 return num_entries;
677
678         update_available_seq(s);
679
680         uint32_t avail = available(s);
681
682         if (avail == 0) {
683                 rte_pause();
684                 return 0;
685         }
686         return (avail <= num_entries) ? avail : num_entries;
687 }
688
689 uint32_t
690 opdl_stage_claim(struct opdl_stage *s, void *entries,
691                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
692 {
693         if (s->threadsafe == false)
694                 return opdl_stage_claim_singlethread(s, entries, num_entries,
695                                 seq, block, atomic);
696         else
697                 return opdl_stage_claim_multithread(s, entries, num_entries,
698                                 seq, block);
699 }
700
701 uint32_t
702 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
703                 uint32_t num_entries, uint32_t *seq, bool block)
704 {
705         if (s->threadsafe == false)
706                 return opdl_stage_claim_copy_singlethread(s, entries,
707                                 num_entries, seq, block);
708         else
709                 return opdl_stage_claim_copy_multithread(s, entries,
710                                 num_entries, seq, block);
711 }
712
713 void
714 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
715                 bool block)
716 {
717
718         if (s->threadsafe == false) {
719                 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
720         } else {
721                 struct claim_manager *disclaims =
722                         &s->pending_disclaims[rte_lcore_id()];
723
724                 if (unlikely(num_entries > s->num_slots)) {
725                         PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
726                                         num_entries, disclaims->num_claimed);
727                         num_entries = disclaims->num_claimed;
728                 }
729
730                 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
731                                 disclaims->num_claimed);
732                 opdl_stage_disclaim_multithread_n(s, num_entries, block);
733         }
734 }
735
736 int
737 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
738 {
739         if (num_entries != s->num_event) {
740                 rte_errno = -EINVAL;
741                 return 0;
742         }
743         if (s->threadsafe == false) {
744                 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
745                 s->seq += s->num_claimed;
746                 s->shadow_head = s->head;
747                 s->num_claimed = 0;
748         } else {
749                 struct claim_manager *disclaims =
750                                 &s->pending_disclaims[rte_lcore_id()];
751                 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
752                                 block);
753         }
754         return num_entries;
755 }
756
757 uint32_t
758 opdl_ring_available(struct opdl_ring *t)
759 {
760         return opdl_stage_available(&t->stages[0]);
761 }
762
763 uint32_t
764 opdl_stage_available(struct opdl_stage *s)
765 {
766         update_available_seq(s);
767         return available(s);
768 }
769
770 void
771 opdl_ring_flush(struct opdl_ring *t)
772 {
773         struct opdl_stage *s = input_stage(t);
774
775         wait_for_available(s, s->num_slots);
776 }
777
778 /******************** Non performance sensitive functions ********************/
779
780 /* Initial setup of a new stage's context */
781 static int
782 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
783                 bool is_input)
784 {
785         uint32_t available = (is_input) ? t->num_slots : 0;
786
787         s->t = t;
788         s->num_slots = t->num_slots;
789         s->index = t->num_stages;
790         s->threadsafe = threadsafe;
791         s->shared.stage = s;
792
793         /* Alloc memory for deps */
794         s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
795                         t->max_num_stages * sizeof(enum dep_type),
796                         0, t->socket);
797         if (s->dep_tracking == NULL)
798                 return -ENOMEM;
799
800         s->deps = rte_zmalloc_socket(LIB_NAME,
801                         t->max_num_stages * sizeof(struct shared_state *),
802                         0, t->socket);
803         if (s->deps == NULL) {
804                 rte_free(s->dep_tracking);
805                 return -ENOMEM;
806         }
807
808         s->dep_tracking[s->index] = DEP_SELF;
809
810         if (threadsafe == true)
811                 s->shared.available_seq = available;
812         else
813                 s->available_seq = available;
814
815         return 0;
816 }
817
818 /* Add direct or indirect dependencies between stages */
819 static int
820 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
821                 enum dep_type type)
822 {
823         struct opdl_ring *t = dependent->t;
824         uint32_t i;
825
826         /* Add new direct dependency */
827         if ((type == DEP_DIRECT) &&
828                         (dependent->dep_tracking[dependency->index] ==
829                                         DEP_NONE)) {
830                 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
831                                 t->name, dependent->index, dependency->index);
832                 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
833         }
834
835         /* Add new indirect dependency or change direct to indirect */
836         if ((type == DEP_INDIRECT) &&
837                         ((dependent->dep_tracking[dependency->index] ==
838                         DEP_NONE) ||
839                         (dependent->dep_tracking[dependency->index] ==
840                         DEP_DIRECT))) {
841                 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
842                                 t->name, dependent->index, dependency->index);
843                 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
844         }
845
846         /* Shouldn't happen... */
847         if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
848                         (dependent != input_stage(t))) {
849                 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
850                                 t->name, dependent->index);
851                 return -EINVAL;
852         }
853
854         /* Keep going to dependencies of the dependency, until input stage */
855         if (dependency != input_stage(t))
856                 for (i = 0; i < dependency->num_deps; i++) {
857                         int ret = add_dep(dependent, dependency->deps[i]->stage,
858                                         DEP_INDIRECT);
859
860                         if (ret < 0)
861                                 return ret;
862                 }
863
864         /* Make list of sequence numbers for direct dependencies only */
865         if (type == DEP_DIRECT)
866                 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
867                         if (dependent->dep_tracking[i] == DEP_DIRECT) {
868                                 if ((i == 0) && (dependent->num_deps > 1))
869                                         rte_panic("%s:%u depends on > input",
870                                                         t->name,
871                                                         dependent->index);
872                                 dependent->deps[dependent->num_deps++] =
873                                                 &t->stages[i].shared;
874                         }
875
876         return 0;
877 }
878
879 struct opdl_ring *
880 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
881                 uint32_t max_num_stages, int socket)
882 {
883         struct opdl_ring *t;
884         char mz_name[RTE_MEMZONE_NAMESIZE];
885         int mz_flags = 0;
886         struct opdl_stage *st = NULL;
887         const struct rte_memzone *mz = NULL;
888         size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
889                         (num_slots * slot_size));
890
891         /* Compile time checking */
892         RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
893                         0);
894         RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
895                         RTE_CACHE_LINE_MASK) != 0);
896         RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
897                         RTE_CACHE_LINE_MASK) != 0);
898         RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
899
900         /* Parameter checking */
901         if (name == NULL) {
902                 PMD_DRV_LOG(ERR, "name param is NULL");
903                 return NULL;
904         }
905         if (!rte_is_power_of_2(num_slots)) {
906                 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
907                                 num_slots, name);
908                 return NULL;
909         }
910
911         /* Alloc memory for stages */
912         st = rte_zmalloc_socket(LIB_NAME,
913                 max_num_stages * sizeof(struct opdl_stage),
914                 RTE_CACHE_LINE_SIZE, socket);
915         if (st == NULL)
916                 goto exit_fail;
917
918         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
919
920         /* Alloc memory for memzone */
921         mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
922         if (mz == NULL)
923                 goto exit_fail;
924
925         t = mz->addr;
926
927         /* Initialise opdl_ring queue */
928         memset(t, 0, sizeof(*t));
929         snprintf(t->name, sizeof(t->name), "%s", name);
930         t->socket = socket;
931         t->num_slots = num_slots;
932         t->mask = num_slots - 1;
933         t->slot_size = slot_size;
934         t->max_num_stages = max_num_stages;
935         t->stages = st;
936
937         PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
938                         t->name, t, num_slots, socket, slot_size);
939
940         return t;
941
942 exit_fail:
943         PMD_DRV_LOG(ERR, "Cannot reserve memory");
944         rte_free(st);
945         rte_memzone_free(mz);
946
947         return NULL;
948 }
949
950 void *
951 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
952 {
953         return get_slot(t, index);
954 }
955
956 bool
957 opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
958                 uint32_t index, bool atomic)
959 {
960         uint32_t i = 0, j = 0, offset;
961         struct opdl_ring *t = s->t;
962         struct rte_event *ev_orig = NULL;
963         bool ev_updated = false;
964         uint64_t  ev_temp = 0;
965
966         if (index > s->num_event) {
967                 PMD_DRV_LOG(ERR, "index is overflow");
968                 return ev_updated;
969         }
970
971         ev_temp = ev->event&OPDL_EVENT_MASK;
972
973         if (!atomic) {
974                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
975                                 s->instance_id);
976                 offset += index*s->nb_instance;
977                 ev_orig = get_slot(t, s->shadow_head+offset);
978                 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
979                         ev_orig->event = ev->event;
980                         ev_updated = true;
981                 }
982                 if (ev_orig->u64 != ev->u64) {
983                         ev_orig->u64 = ev->u64;
984                         ev_updated = true;
985                 }
986
987         } else {
988                 for (i = 0; i < s->num_claimed; i++) {
989                         ev_orig = (struct rte_event *)
990                                 get_slot(t, s->shadow_head+i);
991
992                         if ((ev_orig->flow_id%s->nb_instance) ==
993                                         s->instance_id) {
994
995                                 if (j == index) {
996                                         if ((ev_orig->event&OPDL_EVENT_MASK) !=
997                                                         ev_temp) {
998                                                 ev_orig->event = ev->event;
999                                                 ev_updated = true;
1000                                         }
1001                                         if (ev_orig->u64 != ev->u64) {
1002                                                 ev_orig->u64 = ev->u64;
1003                                                 ev_updated = true;
1004                                         }
1005
1006                                         break;
1007                                 }
1008                                 j++;
1009                         }
1010                 }
1011
1012         }
1013
1014         return ev_updated;
1015 }
1016
1017 int
1018 opdl_ring_get_socket(const struct opdl_ring *t)
1019 {
1020         return t->socket;
1021 }
1022
1023 uint32_t
1024 opdl_ring_get_num_slots(const struct opdl_ring *t)
1025 {
1026         return t->num_slots;
1027 }
1028
1029 const char *
1030 opdl_ring_get_name(const struct opdl_ring *t)
1031 {
1032         return t->name;
1033 }
1034
1035 /* Check dependency list is valid for a given opdl_ring */
1036 static int
1037 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1038                 uint32_t num_deps)
1039 {
1040         unsigned int i;
1041
1042         for (i = 0; i < num_deps; ++i) {
1043                 if (!deps[i]) {
1044                         PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1045                         return -EINVAL;
1046                 }
1047                 if (t != deps[i]->t) {
1048                         PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1049                                         i, deps[i]->t->name, t->name);
1050                         return -EINVAL;
1051                 }
1052         }
1053         if (num_deps > t->num_stages) {
1054                 PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
1055                                 num_deps, t->num_stages);
1056                 return -EINVAL;
1057         }
1058         return 0;
1059 }
1060
1061 struct opdl_stage *
1062 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1063 {
1064         struct opdl_stage *s;
1065
1066         /* Parameter checking */
1067         if (!t) {
1068                 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1069                 return NULL;
1070         }
1071         if (t->num_stages == t->max_num_stages) {
1072                 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1073                                 t->name, t->max_num_stages);
1074                 return NULL;
1075         }
1076
1077         s = &t->stages[t->num_stages];
1078
1079         if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1080                 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1081                                 &s->shared, t->name);
1082
1083         if (init_stage(t, s, threadsafe, is_input) < 0) {
1084                 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1085                 return NULL;
1086         }
1087         t->num_stages++;
1088
1089         return s;
1090 }
1091
1092 uint32_t
1093 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1094                 uint32_t nb_instance, uint32_t instance_id,
1095                 struct opdl_stage *deps[],
1096                 uint32_t num_deps)
1097 {
1098         uint32_t i;
1099         int ret = 0;
1100
1101         if ((num_deps > 0) && (!deps)) {
1102                 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1103                 return -1;
1104         }
1105         ret = check_deps(t, deps, num_deps);
1106         if (ret < 0)
1107                 return ret;
1108
1109         for (i = 0; i < num_deps; i++) {
1110                 ret = add_dep(s, deps[i], DEP_DIRECT);
1111                 if (ret < 0)
1112                         return ret;
1113         }
1114
1115         s->nb_instance = nb_instance;
1116         s->instance_id = instance_id;
1117
1118         return ret;
1119 }
1120
1121 struct opdl_stage *
1122 opdl_ring_get_input_stage(const struct opdl_ring *t)
1123 {
1124         return input_stage(t);
1125 }
1126
1127 int
1128 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1129                 uint32_t num_deps)
1130 {
1131         unsigned int i;
1132         int ret;
1133
1134         if ((num_deps == 0) || (!deps)) {
1135                 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1136                 return -EINVAL;
1137         }
1138
1139         ret = check_deps(s->t, deps, num_deps);
1140         if (ret < 0)
1141                 return ret;
1142
1143         /* Update deps */
1144         for (i = 0; i < num_deps; i++)
1145                 s->deps[i] = &deps[i]->shared;
1146         s->num_deps = num_deps;
1147
1148         return 0;
1149 }
1150
1151 struct opdl_ring *
1152 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1153 {
1154         return s->t;
1155 }
1156
1157 void
1158 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1159 {
1160         uint32_t i;
1161
1162         if (t == NULL) {
1163                 fprintf(f, "NULL OPDL!\n");
1164                 return;
1165         }
1166         fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1167                         t->name, t->num_slots, t->mask, t->slot_size,
1168                         t->num_stages, t->socket);
1169         for (i = 0; i < t->num_stages; i++) {
1170                 uint32_t j;
1171                 const struct opdl_stage *s = &t->stages[i];
1172
1173                 fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1174                                 t->name, i, (s->threadsafe) ? "true" : "false",
1175                                 (s->threadsafe) ? s->shared.head : s->head,
1176                                 (s->threadsafe) ? s->shared.available_seq :
1177                                 s->available_seq,
1178                                 s->shared.tail, (s->num_deps > 0) ?
1179                                 s->deps[0]->stage->index : 0);
1180                 for (j = 1; j < s->num_deps; j++)
1181                         fprintf(f, ",%u", s->deps[j]->stage->index);
1182                 fprintf(f, "\n");
1183         }
1184         fflush(f);
1185 }
1186
1187 void
1188 opdl_ring_free(struct opdl_ring *t)
1189 {
1190         uint32_t i;
1191         const struct rte_memzone *mz;
1192         char mz_name[RTE_MEMZONE_NAMESIZE];
1193
1194         if (t == NULL) {
1195                 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1196                 return;
1197         }
1198
1199         PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1200
1201         for (i = 0; i < t->num_stages; ++i) {
1202                 rte_free(t->stages[i].deps);
1203                 rte_free(t->stages[i].dep_tracking);
1204         }
1205
1206         rte_free(t->stages);
1207
1208         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1209         mz = rte_memzone_lookup(mz_name);
1210         if (rte_memzone_free(mz) != 0)
1211                 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1212 }
1213
1214 /* search a opdl_ring from its name */
1215 struct opdl_ring *
1216 opdl_ring_lookup(const char *name)
1217 {
1218         const struct rte_memzone *mz;
1219         char mz_name[RTE_MEMZONE_NAMESIZE];
1220
1221         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1222
1223         mz = rte_memzone_lookup(mz_name);
1224         if (mz == NULL)
1225                 return NULL;
1226
1227         return mz->addr;
1228 }
1229
1230 void
1231 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1232 {
1233         s->threadsafe = threadsafe;
1234 }