event/opdl: use new API to save cycles on aarch64
[dpdk.git] / drivers / event / opdl / opdl_ring.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdbool.h>
6 #include <stddef.h>
7 #include <stdint.h>
8 #include <stdio.h>
9
10 #include <rte_string_fns.h>
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
14 #include <rte_log.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_atomic.h>
20
21 #include "opdl_ring.h"
22 #include "opdl_log.h"
23
24 #define LIB_NAME "opdl_ring"
25
26 #define OPDL_NAME_SIZE 64
27
28
29 #define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
30 #define OPDL_FLOWID_MASK (0xFFFFF)
31 #define OPDL_OPA_MASK    (0xFF)
32 #define OPDL_OPA_OFFSET  (0x38)
33
34 int opdl_logtype_driver;
35
36 /* Types of dependency between stages */
37 enum dep_type {
38         DEP_NONE = 0,  /* no dependency */
39         DEP_DIRECT,  /* stage has direct dependency */
40         DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
41         DEP_SELF,  /* stage dependency on itself, used to detect loops */
42 };
43
44 /* Shared section of stage state.
45  * Care is needed when accessing and the layout is important, especially to
46  * limit the adjacent cache-line HW prefetcher from impacting performance.
47  */
48 struct shared_state {
49         /* Last known minimum sequence number of dependencies, used for multi
50          * thread operation
51          */
52         uint32_t available_seq;
53         char _pad1[RTE_CACHE_LINE_SIZE * 3];
54         uint32_t head;  /* Head sequence number (for multi thread operation) */
55         char _pad2[RTE_CACHE_LINE_SIZE * 3];
56         struct opdl_stage *stage;  /* back pointer */
57         uint32_t tail;  /* Tail sequence number */
58         char _pad3[RTE_CACHE_LINE_SIZE * 2];
59 } __rte_cache_aligned;
60
61 /* A structure to keep track of "unfinished" claims. This is only used for
62  * stages that are threadsafe. Each lcore accesses its own instance of this
63  * structure to record the entries it has claimed. This allows one lcore to make
64  * multiple claims without being blocked by another. When disclaiming it moves
65  * forward the shared tail when the shared tail matches the tail value recorded
66  * here.
67  */
68 struct claim_manager {
69         uint32_t num_to_disclaim;
70         uint32_t num_claimed;
71         uint32_t mgr_head;
72         uint32_t mgr_tail;
73         struct {
74                 uint32_t head;
75                 uint32_t tail;
76         } claims[OPDL_DISCLAIMS_PER_LCORE];
77 } __rte_cache_aligned;
78
79 /* Context for each stage of opdl_ring.
80  * Calculations on sequence numbers need to be done with other uint32_t values
81  * so that results are modulus 2^32, and not undefined.
82  */
83 struct opdl_stage {
84         struct opdl_ring *t;  /* back pointer, set at init */
85         uint32_t num_slots;  /* Number of slots for entries, set at init */
86         uint32_t index;  /* ID for this stage, set at init */
87         bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
88         /* Last known min seq number of dependencies for used for single thread
89          * operation
90          */
91         uint32_t available_seq;
92         uint32_t head;  /* Current head for single-thread operation */
93         uint32_t nb_instance;  /* Number of instances */
94         uint32_t instance_id;  /* ID of this stage instance */
95         uint16_t num_claimed;  /* Number of slots claimed */
96         uint16_t num_event;             /* Number of events */
97         uint32_t seq;                   /* sequence number  */
98         uint32_t num_deps;  /* Number of direct dependencies */
99         /* Keep track of all dependencies, used during init only */
100         enum dep_type *dep_tracking;
101         /* Direct dependencies of this stage */
102         struct shared_state **deps;
103         /* Other stages read this! */
104         struct shared_state shared __rte_cache_aligned;
105         /* For managing disclaims in multi-threaded processing stages */
106         struct claim_manager pending_disclaims[RTE_MAX_LCORE]
107                                                __rte_cache_aligned;
108         uint32_t shadow_head;  /* Shadow head for single-thread operation */
109         uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
110         uint32_t pos;           /* Atomic scan position */
111 } __rte_cache_aligned;
112
113 /* Context for opdl_ring */
114 struct opdl_ring {
115         char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
116         int socket;  /* NUMA socket that memory is allocated on */
117         uint32_t num_slots;  /* Number of slots for entries */
118         uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
119         uint32_t slot_size;  /* Size of each slot in bytes */
120         uint32_t num_stages;  /* Number of stages that have been added */
121         uint32_t max_num_stages;  /* Max number of stages */
122         /* Stages indexed by ID */
123         struct opdl_stage *stages;
124         /* Memory for storing slot data */
125         uint8_t slots[0] __rte_cache_aligned;
126 };
127
128
129 /* Return input stage of a opdl_ring */
130 static __rte_always_inline struct opdl_stage *
131 input_stage(const struct opdl_ring *t)
132 {
133         return &t->stages[0];
134 }
135
136 /* Check if a stage is the input stage */
137 static __rte_always_inline bool
138 is_input_stage(const struct opdl_stage *s)
139 {
140         return s->index == 0;
141 }
142
143 /* Get slot pointer from sequence number */
144 static __rte_always_inline void *
145 get_slot(const struct opdl_ring *t, uint32_t n)
146 {
147         return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
148 }
149
150 /* Find how many entries are available for processing */
151 static __rte_always_inline uint32_t
152 available(const struct opdl_stage *s)
153 {
154         if (s->threadsafe == true) {
155                 uint32_t n = __atomic_load_n(&s->shared.available_seq,
156                                 __ATOMIC_ACQUIRE) -
157                                 __atomic_load_n(&s->shared.head,
158                                 __ATOMIC_ACQUIRE);
159
160                 /* Return 0 if available_seq needs to be updated */
161                 return (n <= s->num_slots) ? n : 0;
162         }
163
164         /* Single threaded */
165         return s->available_seq - s->head;
166 }
167
168 /* Read sequence number of dependencies and find minimum */
169 static __rte_always_inline void
170 update_available_seq(struct opdl_stage *s)
171 {
172         uint32_t i;
173         uint32_t this_tail = s->shared.tail;
174         uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
175         /* Input stage sequence numbers are greater than the sequence numbers of
176          * its dependencies so an offset of t->num_slots is needed when
177          * calculating available slots and also the condition which is used to
178          * determine the dependencies minimum sequence number must be reverted.
179          */
180         uint32_t wrap;
181
182         if (is_input_stage(s)) {
183                 wrap = s->num_slots;
184                 for (i = 1; i < s->num_deps; i++) {
185                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
186                                         __ATOMIC_ACQUIRE);
187                         if ((this_tail - seq) > (this_tail - min_seq))
188                                 min_seq = seq;
189                 }
190         } else {
191                 wrap = 0;
192                 for (i = 1; i < s->num_deps; i++) {
193                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
194                                         __ATOMIC_ACQUIRE);
195                         if ((seq - this_tail) < (min_seq - this_tail))
196                                 min_seq = seq;
197                 }
198         }
199
200         if (s->threadsafe == false)
201                 s->available_seq = min_seq + wrap;
202         else
203                 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
204                                 __ATOMIC_RELEASE);
205 }
206
207 /* Wait until the number of available slots reaches number requested */
208 static __rte_always_inline void
209 wait_for_available(struct opdl_stage *s, uint32_t n)
210 {
211         while (available(s) < n) {
212                 rte_pause();
213                 update_available_seq(s);
214         }
215 }
216
217 /* Return number of slots to process based on number requested and mode */
218 static __rte_always_inline uint32_t
219 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
220 {
221         /* Don't read tail sequences of dependencies if not needed */
222         if (available(s) >= n)
223                 return n;
224
225         update_available_seq(s);
226
227         if (block == false) {
228                 uint32_t avail = available(s);
229
230                 if (avail == 0) {
231                         rte_pause();
232                         return 0;
233                 }
234                 return (avail <= n) ? avail : n;
235         }
236
237         if (unlikely(n > s->num_slots)) {
238                 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
239                                 n, s->num_slots);
240                 return 0;  /* Avoid infinite loop */
241         }
242         /* blocking */
243         wait_for_available(s, n);
244         return n;
245 }
246
247 /* Copy entries in to slots with wrap-around */
248 static __rte_always_inline void
249 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
250                 uint32_t num_entries)
251 {
252         uint32_t slot_size = t->slot_size;
253         uint32_t slot_index = start & t->mask;
254
255         if (slot_index + num_entries <= t->num_slots) {
256                 rte_memcpy(get_slot(t, start), entries,
257                                 num_entries * slot_size);
258         } else {
259                 uint32_t split = t->num_slots - slot_index;
260
261                 rte_memcpy(get_slot(t, start), entries, split * slot_size);
262                 rte_memcpy(get_slot(t, 0),
263                                 RTE_PTR_ADD(entries, split * slot_size),
264                                 (num_entries - split) * slot_size);
265         }
266 }
267
268 /* Copy entries out from slots with wrap-around */
269 static __rte_always_inline void
270 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
271                 uint32_t num_entries)
272 {
273         uint32_t slot_size = t->slot_size;
274         uint32_t slot_index = start & t->mask;
275
276         if (slot_index + num_entries <= t->num_slots) {
277                 rte_memcpy(entries, get_slot(t, start),
278                                 num_entries * slot_size);
279         } else {
280                 uint32_t split = t->num_slots - slot_index;
281
282                 rte_memcpy(entries, get_slot(t, start), split * slot_size);
283                 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
284                                 get_slot(t, 0),
285                                 (num_entries - split) * slot_size);
286         }
287 }
288
289 /* Input function optimised for single thread */
290 static __rte_always_inline uint32_t
291 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
292                 uint32_t num_entries, bool block)
293 {
294         struct opdl_stage *s = input_stage(t);
295         uint32_t head = s->head;
296
297         num_entries = num_to_process(s, num_entries, block);
298         if (num_entries == 0)
299                 return 0;
300
301         copy_entries_in(t, head, entries, num_entries);
302
303         s->head += num_entries;
304         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
305
306         return num_entries;
307 }
308
309 /* Convert head and tail of claim_manager into valid index */
310 static __rte_always_inline uint32_t
311 claim_mgr_index(uint32_t n)
312 {
313         return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
314 }
315
316 /* Check if there are available slots in claim_manager */
317 static __rte_always_inline bool
318 claim_mgr_available(struct claim_manager *mgr)
319 {
320         return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
321                         true : false;
322 }
323
324 /* Record a new claim. Only use after first checking an entry is available */
325 static __rte_always_inline void
326 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
327 {
328         if ((mgr->mgr_head != mgr->mgr_tail) &&
329                         (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
330                         tail)) {
331                 /* Combine with previous claim */
332                 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
333         } else {
334                 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
335                 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
336                 mgr->mgr_head++;
337         }
338
339         mgr->num_claimed += (head - tail);
340 }
341
342 /* Read the oldest recorded claim */
343 static __rte_always_inline bool
344 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
345 {
346         if (mgr->mgr_head == mgr->mgr_tail)
347                 return false;
348
349         *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
350         *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
351         return true;
352 }
353
354 /* Remove the oldest recorded claim. Only use after first reading the entry */
355 static __rte_always_inline void
356 claim_mgr_remove(struct claim_manager *mgr)
357 {
358         mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
359                         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
360         mgr->mgr_tail++;
361 }
362
363 /* Update tail in the oldest claim. Only use after first reading the entry */
364 static __rte_always_inline void
365 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
366 {
367         mgr->num_claimed -= num_entries;
368         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
369 }
370
371 static __rte_always_inline void
372 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
373                 uint32_t num_entries, bool block)
374 {
375         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
376         uint32_t head;
377         uint32_t tail;
378
379         while (num_entries) {
380                 bool ret = claim_mgr_read(disclaims, &tail, &head);
381
382                 if (ret == false)
383                         break;  /* nothing is claimed */
384                 /* There should be no race condition here. If shared.tail
385                  * matches, no other core can update it until this one does.
386                  */
387                 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
388                                 tail) {
389                         if (num_entries >= (head - tail)) {
390                                 claim_mgr_remove(disclaims);
391                                 __atomic_store_n(&s->shared.tail, head,
392                                                 __ATOMIC_RELEASE);
393                                 num_entries -= (head - tail);
394                         } else {
395                                 claim_mgr_move_tail(disclaims, num_entries);
396                                 __atomic_store_n(&s->shared.tail,
397                                                 num_entries + tail,
398                                                 __ATOMIC_RELEASE);
399                                 num_entries = 0;
400                         }
401                 } else if (block == false)
402                         break;  /* blocked by other thread */
403                 /* Keep going until num_entries are disclaimed. */
404                 rte_pause();
405         }
406
407         disclaims->num_to_disclaim = num_entries;
408 }
409
410 /* Move head atomically, returning number of entries available to process and
411  * the original value of head. For non-input stages, the claim is recorded
412  * so that the tail can be updated later by opdl_stage_disclaim().
413  */
414 static __rte_always_inline void
415 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
416                 uint32_t *old_head, bool block, bool claim_func)
417 {
418         uint32_t orig_num_entries = *num_entries;
419         uint32_t ret;
420         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
421
422         /* Attempt to disclaim any outstanding claims */
423         opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
424                         false);
425
426         *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
427         while (true) {
428                 bool success;
429                 /* If called by opdl_ring_input(), claim does not need to be
430                  * recorded, as there will be no disclaim.
431                  */
432                 if (claim_func) {
433                         /* Check that the claim can be recorded */
434                         ret = claim_mgr_available(disclaims);
435                         if (ret == false) {
436                                 /* exit out if claim can't be recorded */
437                                 *num_entries = 0;
438                                 return;
439                         }
440                 }
441
442                 *num_entries = num_to_process(s, orig_num_entries, block);
443                 if (*num_entries == 0)
444                         return;
445
446                 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
447                                 *old_head + *num_entries,
448                                 true,  /* may fail spuriously */
449                                 __ATOMIC_RELEASE,  /* memory order on success */
450                                 __ATOMIC_ACQUIRE);  /* memory order on fail */
451                 if (likely(success))
452                         break;
453                 rte_pause();
454         }
455
456         if (claim_func)
457                 /* Store the claim record */
458                 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
459 }
460
461 /* Input function that supports multiple threads */
462 static __rte_always_inline uint32_t
463 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
464                 uint32_t num_entries, bool block)
465 {
466         struct opdl_stage *s = input_stage(t);
467         uint32_t old_head;
468
469         move_head_atomically(s, &num_entries, &old_head, block, false);
470         if (num_entries == 0)
471                 return 0;
472
473         copy_entries_in(t, old_head, entries, num_entries);
474
475         /* If another thread started inputting before this one, but hasn't
476          * finished, we need to wait for it to complete to update the tail.
477          */
478         rte_wait_until_equal_32(&s->shared.tail, old_head, __ATOMIC_ACQUIRE);
479
480         __atomic_store_n(&s->shared.tail, old_head + num_entries,
481                         __ATOMIC_RELEASE);
482
483         return num_entries;
484 }
485
486 static __rte_always_inline uint32_t
487 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
488                 uint8_t this_lcore)
489 {
490         return ((nb_p_lcores <= 1) ? 0 :
491                         (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
492                         nb_p_lcores);
493 }
494
495 /* Claim slots to process, optimised for single-thread operation */
496 static __rte_always_inline uint32_t
497 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
498                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
499 {
500         uint32_t i = 0, j = 0,  offset;
501         uint32_t opa_id   = 0;
502         uint32_t flow_id  = 0;
503         uint64_t event    = 0;
504         void *get_slots;
505         struct rte_event *ev;
506         RTE_SET_USED(seq);
507         struct opdl_ring *t = s->t;
508         uint8_t *entries_offset = (uint8_t *)entries;
509
510         if (!atomic) {
511
512                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
513                                 s->instance_id);
514
515                 num_entries = s->nb_instance * num_entries;
516
517                 num_entries = num_to_process(s, num_entries, block);
518
519                 for (; offset < num_entries; offset += s->nb_instance) {
520                         get_slots = get_slot(t, s->head + offset);
521                         memcpy(entries_offset, get_slots, t->slot_size);
522                         entries_offset += t->slot_size;
523                         i++;
524                 }
525         } else {
526                 num_entries = num_to_process(s, num_entries, block);
527
528                 for (j = 0; j < num_entries; j++) {
529                         ev = (struct rte_event *)get_slot(t, s->head+j);
530
531                         event  = __atomic_load_n(&(ev->event),
532                                         __ATOMIC_ACQUIRE);
533
534                         opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
535                         flow_id  = OPDL_FLOWID_MASK & event;
536
537                         if (opa_id >= s->queue_id)
538                                 continue;
539
540                         if ((flow_id % s->nb_instance) == s->instance_id) {
541                                 memcpy(entries_offset, ev, t->slot_size);
542                                 entries_offset += t->slot_size;
543                                 i++;
544                         }
545                 }
546         }
547         s->shadow_head = s->head;
548         s->head += num_entries;
549         s->num_claimed = num_entries;
550         s->num_event = i;
551         s->pos = 0;
552
553         /* automatically disclaim entries if number of rte_events is zero */
554         if (unlikely(i == 0))
555                 opdl_stage_disclaim(s, 0, false);
556
557         return i;
558 }
559
560 /* Thread-safe version of function to claim slots for processing */
561 static __rte_always_inline uint32_t
562 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
563                 uint32_t num_entries, uint32_t *seq, bool block)
564 {
565         uint32_t old_head;
566         struct opdl_ring *t = s->t;
567         uint32_t i = 0, offset;
568         uint8_t *entries_offset = (uint8_t *)entries;
569
570         if (seq == NULL) {
571                 PMD_DRV_LOG(ERR, "Invalid seq PTR");
572                 return 0;
573         }
574         offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
575         num_entries = offset + (s->nb_instance * num_entries);
576
577         move_head_atomically(s, &num_entries, &old_head, block, true);
578
579         for (; offset < num_entries; offset += s->nb_instance) {
580                 memcpy(entries_offset, get_slot(t, s->head + offset),
581                         t->slot_size);
582                 entries_offset += t->slot_size;
583                 i++;
584         }
585
586         *seq = old_head;
587
588         return i;
589 }
590
591 /* Claim and copy slot pointers, optimised for single-thread operation */
592 static __rte_always_inline uint32_t
593 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
594                 uint32_t num_entries, uint32_t *seq, bool block)
595 {
596         num_entries = num_to_process(s, num_entries, block);
597         if (num_entries == 0)
598                 return 0;
599         copy_entries_out(s->t, s->head, entries, num_entries);
600         if (seq != NULL)
601                 *seq = s->head;
602         s->head += num_entries;
603         return num_entries;
604 }
605
606 /* Thread-safe version of function to claim and copy pointers to slots */
607 static __rte_always_inline uint32_t
608 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
609                 uint32_t num_entries, uint32_t *seq, bool block)
610 {
611         uint32_t old_head;
612
613         move_head_atomically(s, &num_entries, &old_head, block, true);
614         if (num_entries == 0)
615                 return 0;
616         copy_entries_out(s->t, old_head, entries, num_entries);
617         if (seq != NULL)
618                 *seq = old_head;
619         return num_entries;
620 }
621
622 static __rte_always_inline void
623 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
624                 uint32_t num_entries)
625 {
626         uint32_t old_tail = s->shared.tail;
627
628         if (unlikely(num_entries > (s->head - old_tail))) {
629                 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
630                                 num_entries, s->head - old_tail);
631                 num_entries = s->head - old_tail;
632         }
633         __atomic_store_n(&s->shared.tail, num_entries + old_tail,
634                         __ATOMIC_RELEASE);
635 }
636
637 uint32_t
638 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
639                 bool block)
640 {
641         if (input_stage(t)->threadsafe == false)
642                 return opdl_ring_input_singlethread(t, entries, num_entries,
643                                 block);
644         else
645                 return opdl_ring_input_multithread(t, entries, num_entries,
646                                 block);
647 }
648
649 uint32_t
650 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
651                 const void *entries, uint32_t num_entries, bool block)
652 {
653         uint32_t head = s->head;
654
655         num_entries = num_to_process(s, num_entries, block);
656
657         if (num_entries == 0)
658                 return 0;
659
660         copy_entries_in(t, head, entries, num_entries);
661
662         s->head += num_entries;
663         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
664
665         return num_entries;
666
667 }
668
669 uint32_t
670 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
671                 void *entries, uint32_t num_entries, bool block)
672 {
673         uint32_t head = s->head;
674
675         num_entries = num_to_process(s, num_entries, block);
676         if (num_entries == 0)
677                 return 0;
678
679         copy_entries_out(t, head, entries, num_entries);
680
681         s->head += num_entries;
682         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
683
684         return num_entries;
685 }
686
687 uint32_t
688 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
689 {
690         /* return (num_to_process(s, num_entries, false)); */
691
692         if (available(s) >= num_entries)
693                 return num_entries;
694
695         update_available_seq(s);
696
697         uint32_t avail = available(s);
698
699         if (avail == 0) {
700                 rte_pause();
701                 return 0;
702         }
703         return (avail <= num_entries) ? avail : num_entries;
704 }
705
706 uint32_t
707 opdl_stage_claim(struct opdl_stage *s, void *entries,
708                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
709 {
710         if (s->threadsafe == false)
711                 return opdl_stage_claim_singlethread(s, entries, num_entries,
712                                 seq, block, atomic);
713         else
714                 return opdl_stage_claim_multithread(s, entries, num_entries,
715                                 seq, block);
716 }
717
718 uint32_t
719 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
720                 uint32_t num_entries, uint32_t *seq, bool block)
721 {
722         if (s->threadsafe == false)
723                 return opdl_stage_claim_copy_singlethread(s, entries,
724                                 num_entries, seq, block);
725         else
726                 return opdl_stage_claim_copy_multithread(s, entries,
727                                 num_entries, seq, block);
728 }
729
730 void
731 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
732                 bool block)
733 {
734
735         if (s->threadsafe == false) {
736                 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
737         } else {
738                 struct claim_manager *disclaims =
739                         &s->pending_disclaims[rte_lcore_id()];
740
741                 if (unlikely(num_entries > s->num_slots)) {
742                         PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
743                                         num_entries, disclaims->num_claimed);
744                         num_entries = disclaims->num_claimed;
745                 }
746
747                 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
748                                 disclaims->num_claimed);
749                 opdl_stage_disclaim_multithread_n(s, num_entries, block);
750         }
751 }
752
753 int
754 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
755 {
756         if (num_entries != s->num_event) {
757                 rte_errno = EINVAL;
758                 return 0;
759         }
760         if (s->threadsafe == false) {
761                 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
762                 s->seq += s->num_claimed;
763                 s->shadow_head = s->head;
764                 s->num_claimed = 0;
765         } else {
766                 struct claim_manager *disclaims =
767                                 &s->pending_disclaims[rte_lcore_id()];
768                 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
769                                 block);
770         }
771         return num_entries;
772 }
773
774 uint32_t
775 opdl_ring_available(struct opdl_ring *t)
776 {
777         return opdl_stage_available(&t->stages[0]);
778 }
779
780 uint32_t
781 opdl_stage_available(struct opdl_stage *s)
782 {
783         update_available_seq(s);
784         return available(s);
785 }
786
787 void
788 opdl_ring_flush(struct opdl_ring *t)
789 {
790         struct opdl_stage *s = input_stage(t);
791
792         wait_for_available(s, s->num_slots);
793 }
794
795 /******************** Non performance sensitive functions ********************/
796
797 /* Initial setup of a new stage's context */
798 static int
799 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
800                 bool is_input)
801 {
802         uint32_t available = (is_input) ? t->num_slots : 0;
803
804         s->t = t;
805         s->num_slots = t->num_slots;
806         s->index = t->num_stages;
807         s->threadsafe = threadsafe;
808         s->shared.stage = s;
809
810         /* Alloc memory for deps */
811         s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
812                         t->max_num_stages * sizeof(enum dep_type),
813                         0, t->socket);
814         if (s->dep_tracking == NULL)
815                 return -ENOMEM;
816
817         s->deps = rte_zmalloc_socket(LIB_NAME,
818                         t->max_num_stages * sizeof(struct shared_state *),
819                         0, t->socket);
820         if (s->deps == NULL) {
821                 rte_free(s->dep_tracking);
822                 return -ENOMEM;
823         }
824
825         s->dep_tracking[s->index] = DEP_SELF;
826
827         if (threadsafe == true)
828                 s->shared.available_seq = available;
829         else
830                 s->available_seq = available;
831
832         return 0;
833 }
834
835 /* Add direct or indirect dependencies between stages */
836 static int
837 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
838                 enum dep_type type)
839 {
840         struct opdl_ring *t = dependent->t;
841         uint32_t i;
842
843         /* Add new direct dependency */
844         if ((type == DEP_DIRECT) &&
845                         (dependent->dep_tracking[dependency->index] ==
846                                         DEP_NONE)) {
847                 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
848                                 t->name, dependent->index, dependency->index);
849                 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
850         }
851
852         /* Add new indirect dependency or change direct to indirect */
853         if ((type == DEP_INDIRECT) &&
854                         ((dependent->dep_tracking[dependency->index] ==
855                         DEP_NONE) ||
856                         (dependent->dep_tracking[dependency->index] ==
857                         DEP_DIRECT))) {
858                 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
859                                 t->name, dependent->index, dependency->index);
860                 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
861         }
862
863         /* Shouldn't happen... */
864         if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
865                         (dependent != input_stage(t))) {
866                 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
867                                 t->name, dependent->index);
868                 return -EINVAL;
869         }
870
871         /* Keep going to dependencies of the dependency, until input stage */
872         if (dependency != input_stage(t))
873                 for (i = 0; i < dependency->num_deps; i++) {
874                         int ret = add_dep(dependent, dependency->deps[i]->stage,
875                                         DEP_INDIRECT);
876
877                         if (ret < 0)
878                                 return ret;
879                 }
880
881         /* Make list of sequence numbers for direct dependencies only */
882         if (type == DEP_DIRECT)
883                 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
884                         if (dependent->dep_tracking[i] == DEP_DIRECT) {
885                                 if ((i == 0) && (dependent->num_deps > 1))
886                                         rte_panic("%s:%u depends on > input",
887                                                         t->name,
888                                                         dependent->index);
889                                 dependent->deps[dependent->num_deps++] =
890                                                 &t->stages[i].shared;
891                         }
892
893         return 0;
894 }
895
896 struct opdl_ring *
897 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
898                 uint32_t max_num_stages, int socket)
899 {
900         struct opdl_ring *t;
901         char mz_name[RTE_MEMZONE_NAMESIZE];
902         int mz_flags = 0;
903         struct opdl_stage *st = NULL;
904         const struct rte_memzone *mz = NULL;
905         size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
906                         (num_slots * slot_size));
907
908         /* Compile time checking */
909         RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
910                         0);
911         RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
912                         RTE_CACHE_LINE_MASK) != 0);
913         RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
914                         RTE_CACHE_LINE_MASK) != 0);
915         RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
916
917         /* Parameter checking */
918         if (name == NULL) {
919                 PMD_DRV_LOG(ERR, "name param is NULL");
920                 return NULL;
921         }
922         if (!rte_is_power_of_2(num_slots)) {
923                 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
924                                 num_slots, name);
925                 return NULL;
926         }
927
928         /* Alloc memory for stages */
929         st = rte_zmalloc_socket(LIB_NAME,
930                 max_num_stages * sizeof(struct opdl_stage),
931                 RTE_CACHE_LINE_SIZE, socket);
932         if (st == NULL)
933                 goto exit_fail;
934
935         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
936
937         /* Alloc memory for memzone */
938         mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
939         if (mz == NULL)
940                 goto exit_fail;
941
942         t = mz->addr;
943
944         /* Initialise opdl_ring queue */
945         memset(t, 0, sizeof(*t));
946         strlcpy(t->name, name, sizeof(t->name));
947         t->socket = socket;
948         t->num_slots = num_slots;
949         t->mask = num_slots - 1;
950         t->slot_size = slot_size;
951         t->max_num_stages = max_num_stages;
952         t->stages = st;
953
954         PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
955                         t->name, t, num_slots, socket, slot_size);
956
957         return t;
958
959 exit_fail:
960         PMD_DRV_LOG(ERR, "Cannot reserve memory");
961         rte_free(st);
962         rte_memzone_free(mz);
963
964         return NULL;
965 }
966
967 void *
968 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
969 {
970         return get_slot(t, index);
971 }
972
973 bool
974 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
975                 uint32_t index, bool atomic)
976 {
977         uint32_t i = 0, offset;
978         struct opdl_ring *t = s->t;
979         struct rte_event *ev_orig = NULL;
980         bool ev_updated = false;
981         uint64_t ev_temp    = 0;
982         uint64_t ev_update  = 0;
983
984         uint32_t opa_id   = 0;
985         uint32_t flow_id  = 0;
986         uint64_t event    = 0;
987
988         if (index > s->num_event) {
989                 PMD_DRV_LOG(ERR, "index is overflow");
990                 return ev_updated;
991         }
992
993         ev_temp = ev->event & OPDL_EVENT_MASK;
994
995         if (!atomic) {
996                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
997                                 s->instance_id);
998                 offset += index*s->nb_instance;
999                 ev_orig = get_slot(t, s->shadow_head+offset);
1000                 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
1001                         ev_orig->event = ev->event;
1002                         ev_updated = true;
1003                 }
1004                 if (ev_orig->u64 != ev->u64) {
1005                         ev_orig->u64 = ev->u64;
1006                         ev_updated = true;
1007                 }
1008
1009         } else {
1010                 for (i = s->pos; i < s->num_claimed; i++) {
1011                         ev_orig = (struct rte_event *)
1012                                 get_slot(t, s->shadow_head+i);
1013
1014                         event  = __atomic_load_n(&(ev_orig->event),
1015                                         __ATOMIC_ACQUIRE);
1016
1017                         opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1018                         flow_id  = OPDL_FLOWID_MASK & event;
1019
1020                         if (opa_id >= s->queue_id)
1021                                 continue;
1022
1023                         if ((flow_id % s->nb_instance) == s->instance_id) {
1024                                 ev_update = s->queue_id;
1025                                 ev_update = (ev_update << OPDL_OPA_OFFSET)
1026                                         | ev->event;
1027
1028                                 s->pos = i + 1;
1029
1030                                 if ((event & OPDL_EVENT_MASK) !=
1031                                                 ev_temp) {
1032                                         __atomic_store_n(&(ev_orig->event),
1033                                                         ev_update,
1034                                                         __ATOMIC_RELEASE);
1035                                         ev_updated = true;
1036                                 }
1037                                 if (ev_orig->u64 != ev->u64) {
1038                                         ev_orig->u64 = ev->u64;
1039                                         ev_updated = true;
1040                                 }
1041
1042                                 break;
1043                         }
1044                 }
1045
1046         }
1047
1048         return ev_updated;
1049 }
1050
1051 int
1052 opdl_ring_get_socket(const struct opdl_ring *t)
1053 {
1054         return t->socket;
1055 }
1056
1057 uint32_t
1058 opdl_ring_get_num_slots(const struct opdl_ring *t)
1059 {
1060         return t->num_slots;
1061 }
1062
1063 const char *
1064 opdl_ring_get_name(const struct opdl_ring *t)
1065 {
1066         return t->name;
1067 }
1068
1069 /* Check dependency list is valid for a given opdl_ring */
1070 static int
1071 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1072                 uint32_t num_deps)
1073 {
1074         unsigned int i;
1075
1076         for (i = 0; i < num_deps; ++i) {
1077                 if (!deps[i]) {
1078                         PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1079                         return -EINVAL;
1080                 }
1081                 if (t != deps[i]->t) {
1082                         PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1083                                         i, deps[i]->t->name, t->name);
1084                         return -EINVAL;
1085                 }
1086         }
1087
1088         return 0;
1089 }
1090
1091 struct opdl_stage *
1092 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1093 {
1094         struct opdl_stage *s;
1095
1096         /* Parameter checking */
1097         if (!t) {
1098                 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1099                 return NULL;
1100         }
1101         if (t->num_stages == t->max_num_stages) {
1102                 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1103                                 t->name, t->max_num_stages);
1104                 return NULL;
1105         }
1106
1107         s = &t->stages[t->num_stages];
1108
1109         if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1110                 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1111                                 &s->shared, t->name);
1112
1113         if (init_stage(t, s, threadsafe, is_input) < 0) {
1114                 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1115                 return NULL;
1116         }
1117         t->num_stages++;
1118
1119         return s;
1120 }
1121
1122 uint32_t
1123 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1124                 uint32_t nb_instance, uint32_t instance_id,
1125                 struct opdl_stage *deps[],
1126                 uint32_t num_deps)
1127 {
1128         uint32_t i;
1129         int ret = 0;
1130
1131         if ((num_deps > 0) && (!deps)) {
1132                 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1133                 return -1;
1134         }
1135         ret = check_deps(t, deps, num_deps);
1136         if (ret < 0)
1137                 return ret;
1138
1139         for (i = 0; i < num_deps; i++) {
1140                 ret = add_dep(s, deps[i], DEP_DIRECT);
1141                 if (ret < 0)
1142                         return ret;
1143         }
1144
1145         s->nb_instance = nb_instance;
1146         s->instance_id = instance_id;
1147
1148         return ret;
1149 }
1150
1151 struct opdl_stage *
1152 opdl_ring_get_input_stage(const struct opdl_ring *t)
1153 {
1154         return input_stage(t);
1155 }
1156
1157 int
1158 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1159                 uint32_t num_deps)
1160 {
1161         unsigned int i;
1162         int ret;
1163
1164         if ((num_deps == 0) || (!deps)) {
1165                 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1166                 return -EINVAL;
1167         }
1168
1169         ret = check_deps(s->t, deps, num_deps);
1170         if (ret < 0)
1171                 return ret;
1172
1173         /* Update deps */
1174         for (i = 0; i < num_deps; i++)
1175                 s->deps[i] = &deps[i]->shared;
1176         s->num_deps = num_deps;
1177
1178         return 0;
1179 }
1180
1181 struct opdl_ring *
1182 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1183 {
1184         return s->t;
1185 }
1186
1187 void
1188 opdl_stage_set_queue_id(struct opdl_stage *s,
1189                 uint32_t queue_id)
1190 {
1191         s->queue_id = queue_id;
1192 }
1193
1194 void
1195 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1196 {
1197         uint32_t i;
1198
1199         if (t == NULL) {
1200                 fprintf(f, "NULL OPDL!\n");
1201                 return;
1202         }
1203         fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1204                         t->name, t->num_slots, t->mask, t->slot_size,
1205                         t->num_stages, t->socket);
1206         for (i = 0; i < t->num_stages; i++) {
1207                 uint32_t j;
1208                 const struct opdl_stage *s = &t->stages[i];
1209
1210                 fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1211                                 t->name, i, (s->threadsafe) ? "true" : "false",
1212                                 (s->threadsafe) ? s->shared.head : s->head,
1213                                 (s->threadsafe) ? s->shared.available_seq :
1214                                 s->available_seq,
1215                                 s->shared.tail, (s->num_deps > 0) ?
1216                                 s->deps[0]->stage->index : 0);
1217                 for (j = 1; j < s->num_deps; j++)
1218                         fprintf(f, ",%u", s->deps[j]->stage->index);
1219                 fprintf(f, "\n");
1220         }
1221         fflush(f);
1222 }
1223
1224 void
1225 opdl_ring_free(struct opdl_ring *t)
1226 {
1227         uint32_t i;
1228         const struct rte_memzone *mz;
1229         char mz_name[RTE_MEMZONE_NAMESIZE];
1230
1231         if (t == NULL) {
1232                 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1233                 return;
1234         }
1235
1236         PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1237
1238         for (i = 0; i < t->num_stages; ++i) {
1239                 rte_free(t->stages[i].deps);
1240                 rte_free(t->stages[i].dep_tracking);
1241         }
1242
1243         rte_free(t->stages);
1244
1245         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1246         mz = rte_memzone_lookup(mz_name);
1247         if (rte_memzone_free(mz) != 0)
1248                 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1249 }
1250
1251 /* search a opdl_ring from its name */
1252 struct opdl_ring *
1253 opdl_ring_lookup(const char *name)
1254 {
1255         const struct rte_memzone *mz;
1256         char mz_name[RTE_MEMZONE_NAMESIZE];
1257
1258         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1259
1260         mz = rte_memzone_lookup(mz_name);
1261         if (mz == NULL)
1262                 return NULL;
1263
1264         return mz->addr;
1265 }
1266
1267 void
1268 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1269 {
1270         s->threadsafe = threadsafe;
1271 }