net/e1000/base: add ADL device ID
[dpdk.git] / drivers / event / opdl / opdl_ring.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdbool.h>
6 #include <stddef.h>
7 #include <stdint.h>
8 #include <stdio.h>
9
10 #include <rte_string_fns.h>
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
14 #include <rte_log.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_atomic.h>
20
21 #include "opdl_ring.h"
22 #include "opdl_log.h"
23
24 #define LIB_NAME "opdl_ring"
25
26 #define OPDL_NAME_SIZE 64
27
28
29 #define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
30 #define OPDL_FLOWID_MASK (0xFFFFF)
31 #define OPDL_OPA_MASK    (0xFF)
32 #define OPDL_OPA_OFFSET  (0x38)
33
34 /* Types of dependency between stages */
35 enum dep_type {
36         DEP_NONE = 0,  /* no dependency */
37         DEP_DIRECT,  /* stage has direct dependency */
38         DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
39         DEP_SELF,  /* stage dependency on itself, used to detect loops */
40 };
41
42 /* Shared section of stage state.
43  * Care is needed when accessing and the layout is important, especially to
44  * limit the adjacent cache-line HW prefetcher from impacting performance.
45  */
46 struct shared_state {
47         /* Last known minimum sequence number of dependencies, used for multi
48          * thread operation
49          */
50         uint32_t available_seq;
51         char _pad1[RTE_CACHE_LINE_SIZE * 3];
52         uint32_t head;  /* Head sequence number (for multi thread operation) */
53         char _pad2[RTE_CACHE_LINE_SIZE * 3];
54         struct opdl_stage *stage;  /* back pointer */
55         uint32_t tail;  /* Tail sequence number */
56         char _pad3[RTE_CACHE_LINE_SIZE * 2];
57 } __rte_cache_aligned;
58
59 /* A structure to keep track of "unfinished" claims. This is only used for
60  * stages that are threadsafe. Each lcore accesses its own instance of this
61  * structure to record the entries it has claimed. This allows one lcore to make
62  * multiple claims without being blocked by another. When disclaiming it moves
63  * forward the shared tail when the shared tail matches the tail value recorded
64  * here.
65  */
66 struct claim_manager {
67         uint32_t num_to_disclaim;
68         uint32_t num_claimed;
69         uint32_t mgr_head;
70         uint32_t mgr_tail;
71         struct {
72                 uint32_t head;
73                 uint32_t tail;
74         } claims[OPDL_DISCLAIMS_PER_LCORE];
75 } __rte_cache_aligned;
76
77 /* Context for each stage of opdl_ring.
78  * Calculations on sequence numbers need to be done with other uint32_t values
79  * so that results are modulus 2^32, and not undefined.
80  */
81 struct opdl_stage {
82         struct opdl_ring *t;  /* back pointer, set at init */
83         uint32_t num_slots;  /* Number of slots for entries, set at init */
84         uint32_t index;  /* ID for this stage, set at init */
85         bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
86         /* Last known min seq number of dependencies for used for single thread
87          * operation
88          */
89         uint32_t available_seq;
90         uint32_t head;  /* Current head for single-thread operation */
91         uint32_t nb_instance;  /* Number of instances */
92         uint32_t instance_id;  /* ID of this stage instance */
93         uint16_t num_claimed;  /* Number of slots claimed */
94         uint16_t num_event;             /* Number of events */
95         uint32_t seq;                   /* sequence number  */
96         uint32_t num_deps;  /* Number of direct dependencies */
97         /* Keep track of all dependencies, used during init only */
98         enum dep_type *dep_tracking;
99         /* Direct dependencies of this stage */
100         struct shared_state **deps;
101         /* Other stages read this! */
102         struct shared_state shared __rte_cache_aligned;
103         /* For managing disclaims in multi-threaded processing stages */
104         struct claim_manager pending_disclaims[RTE_MAX_LCORE]
105                                                __rte_cache_aligned;
106         uint32_t shadow_head;  /* Shadow head for single-thread operation */
107         uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
108         uint32_t pos;           /* Atomic scan position */
109 } __rte_cache_aligned;
110
111 /* Context for opdl_ring */
112 struct opdl_ring {
113         char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
114         int socket;  /* NUMA socket that memory is allocated on */
115         uint32_t num_slots;  /* Number of slots for entries */
116         uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
117         uint32_t slot_size;  /* Size of each slot in bytes */
118         uint32_t num_stages;  /* Number of stages that have been added */
119         uint32_t max_num_stages;  /* Max number of stages */
120         /* Stages indexed by ID */
121         struct opdl_stage *stages;
122         /* Memory for storing slot data */
123         uint8_t slots[0] __rte_cache_aligned;
124 };
125
126
127 /* Return input stage of a opdl_ring */
128 static __rte_always_inline struct opdl_stage *
129 input_stage(const struct opdl_ring *t)
130 {
131         return &t->stages[0];
132 }
133
134 /* Check if a stage is the input stage */
135 static __rte_always_inline bool
136 is_input_stage(const struct opdl_stage *s)
137 {
138         return s->index == 0;
139 }
140
141 /* Get slot pointer from sequence number */
142 static __rte_always_inline void *
143 get_slot(const struct opdl_ring *t, uint32_t n)
144 {
145         return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
146 }
147
148 /* Find how many entries are available for processing */
149 static __rte_always_inline uint32_t
150 available(const struct opdl_stage *s)
151 {
152         if (s->threadsafe == true) {
153                 uint32_t n = __atomic_load_n(&s->shared.available_seq,
154                                 __ATOMIC_ACQUIRE) -
155                                 __atomic_load_n(&s->shared.head,
156                                 __ATOMIC_ACQUIRE);
157
158                 /* Return 0 if available_seq needs to be updated */
159                 return (n <= s->num_slots) ? n : 0;
160         }
161
162         /* Single threaded */
163         return s->available_seq - s->head;
164 }
165
166 /* Read sequence number of dependencies and find minimum */
167 static __rte_always_inline void
168 update_available_seq(struct opdl_stage *s)
169 {
170         uint32_t i;
171         uint32_t this_tail = s->shared.tail;
172         uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
173         /* Input stage sequence numbers are greater than the sequence numbers of
174          * its dependencies so an offset of t->num_slots is needed when
175          * calculating available slots and also the condition which is used to
176          * determine the dependencies minimum sequence number must be reverted.
177          */
178         uint32_t wrap;
179
180         if (is_input_stage(s)) {
181                 wrap = s->num_slots;
182                 for (i = 1; i < s->num_deps; i++) {
183                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
184                                         __ATOMIC_ACQUIRE);
185                         if ((this_tail - seq) > (this_tail - min_seq))
186                                 min_seq = seq;
187                 }
188         } else {
189                 wrap = 0;
190                 for (i = 1; i < s->num_deps; i++) {
191                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
192                                         __ATOMIC_ACQUIRE);
193                         if ((seq - this_tail) < (min_seq - this_tail))
194                                 min_seq = seq;
195                 }
196         }
197
198         if (s->threadsafe == false)
199                 s->available_seq = min_seq + wrap;
200         else
201                 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
202                                 __ATOMIC_RELEASE);
203 }
204
205 /* Wait until the number of available slots reaches number requested */
206 static __rte_always_inline void
207 wait_for_available(struct opdl_stage *s, uint32_t n)
208 {
209         while (available(s) < n) {
210                 rte_pause();
211                 update_available_seq(s);
212         }
213 }
214
215 /* Return number of slots to process based on number requested and mode */
216 static __rte_always_inline uint32_t
217 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
218 {
219         /* Don't read tail sequences of dependencies if not needed */
220         if (available(s) >= n)
221                 return n;
222
223         update_available_seq(s);
224
225         if (block == false) {
226                 uint32_t avail = available(s);
227
228                 if (avail == 0) {
229                         rte_pause();
230                         return 0;
231                 }
232                 return (avail <= n) ? avail : n;
233         }
234
235         if (unlikely(n > s->num_slots)) {
236                 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
237                                 n, s->num_slots);
238                 return 0;  /* Avoid infinite loop */
239         }
240         /* blocking */
241         wait_for_available(s, n);
242         return n;
243 }
244
245 /* Copy entries in to slots with wrap-around */
246 static __rte_always_inline void
247 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
248                 uint32_t num_entries)
249 {
250         uint32_t slot_size = t->slot_size;
251         uint32_t slot_index = start & t->mask;
252
253         if (slot_index + num_entries <= t->num_slots) {
254                 rte_memcpy(get_slot(t, start), entries,
255                                 num_entries * slot_size);
256         } else {
257                 uint32_t split = t->num_slots - slot_index;
258
259                 rte_memcpy(get_slot(t, start), entries, split * slot_size);
260                 rte_memcpy(get_slot(t, 0),
261                                 RTE_PTR_ADD(entries, split * slot_size),
262                                 (num_entries - split) * slot_size);
263         }
264 }
265
266 /* Copy entries out from slots with wrap-around */
267 static __rte_always_inline void
268 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
269                 uint32_t num_entries)
270 {
271         uint32_t slot_size = t->slot_size;
272         uint32_t slot_index = start & t->mask;
273
274         if (slot_index + num_entries <= t->num_slots) {
275                 rte_memcpy(entries, get_slot(t, start),
276                                 num_entries * slot_size);
277         } else {
278                 uint32_t split = t->num_slots - slot_index;
279
280                 rte_memcpy(entries, get_slot(t, start), split * slot_size);
281                 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
282                                 get_slot(t, 0),
283                                 (num_entries - split) * slot_size);
284         }
285 }
286
287 /* Input function optimised for single thread */
288 static __rte_always_inline uint32_t
289 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
290                 uint32_t num_entries, bool block)
291 {
292         struct opdl_stage *s = input_stage(t);
293         uint32_t head = s->head;
294
295         num_entries = num_to_process(s, num_entries, block);
296         if (num_entries == 0)
297                 return 0;
298
299         copy_entries_in(t, head, entries, num_entries);
300
301         s->head += num_entries;
302         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
303
304         return num_entries;
305 }
306
307 /* Convert head and tail of claim_manager into valid index */
308 static __rte_always_inline uint32_t
309 claim_mgr_index(uint32_t n)
310 {
311         return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
312 }
313
314 /* Check if there are available slots in claim_manager */
315 static __rte_always_inline bool
316 claim_mgr_available(struct claim_manager *mgr)
317 {
318         return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
319                         true : false;
320 }
321
322 /* Record a new claim. Only use after first checking an entry is available */
323 static __rte_always_inline void
324 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
325 {
326         if ((mgr->mgr_head != mgr->mgr_tail) &&
327                         (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
328                         tail)) {
329                 /* Combine with previous claim */
330                 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
331         } else {
332                 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
333                 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
334                 mgr->mgr_head++;
335         }
336
337         mgr->num_claimed += (head - tail);
338 }
339
340 /* Read the oldest recorded claim */
341 static __rte_always_inline bool
342 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
343 {
344         if (mgr->mgr_head == mgr->mgr_tail)
345                 return false;
346
347         *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
348         *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
349         return true;
350 }
351
352 /* Remove the oldest recorded claim. Only use after first reading the entry */
353 static __rte_always_inline void
354 claim_mgr_remove(struct claim_manager *mgr)
355 {
356         mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
357                         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
358         mgr->mgr_tail++;
359 }
360
361 /* Update tail in the oldest claim. Only use after first reading the entry */
362 static __rte_always_inline void
363 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
364 {
365         mgr->num_claimed -= num_entries;
366         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
367 }
368
369 static __rte_always_inline void
370 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
371                 uint32_t num_entries, bool block)
372 {
373         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
374         uint32_t head;
375         uint32_t tail;
376
377         while (num_entries) {
378                 bool ret = claim_mgr_read(disclaims, &tail, &head);
379
380                 if (ret == false)
381                         break;  /* nothing is claimed */
382                 /* There should be no race condition here. If shared.tail
383                  * matches, no other core can update it until this one does.
384                  */
385                 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
386                                 tail) {
387                         if (num_entries >= (head - tail)) {
388                                 claim_mgr_remove(disclaims);
389                                 __atomic_store_n(&s->shared.tail, head,
390                                                 __ATOMIC_RELEASE);
391                                 num_entries -= (head - tail);
392                         } else {
393                                 claim_mgr_move_tail(disclaims, num_entries);
394                                 __atomic_store_n(&s->shared.tail,
395                                                 num_entries + tail,
396                                                 __ATOMIC_RELEASE);
397                                 num_entries = 0;
398                         }
399                 } else if (block == false)
400                         break;  /* blocked by other thread */
401                 /* Keep going until num_entries are disclaimed. */
402                 rte_pause();
403         }
404
405         disclaims->num_to_disclaim = num_entries;
406 }
407
408 /* Move head atomically, returning number of entries available to process and
409  * the original value of head. For non-input stages, the claim is recorded
410  * so that the tail can be updated later by opdl_stage_disclaim().
411  */
412 static __rte_always_inline void
413 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
414                 uint32_t *old_head, bool block, bool claim_func)
415 {
416         uint32_t orig_num_entries = *num_entries;
417         uint32_t ret;
418         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
419
420         /* Attempt to disclaim any outstanding claims */
421         opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
422                         false);
423
424         *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
425         while (true) {
426                 bool success;
427                 /* If called by opdl_ring_input(), claim does not need to be
428                  * recorded, as there will be no disclaim.
429                  */
430                 if (claim_func) {
431                         /* Check that the claim can be recorded */
432                         ret = claim_mgr_available(disclaims);
433                         if (ret == false) {
434                                 /* exit out if claim can't be recorded */
435                                 *num_entries = 0;
436                                 return;
437                         }
438                 }
439
440                 *num_entries = num_to_process(s, orig_num_entries, block);
441                 if (*num_entries == 0)
442                         return;
443
444                 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
445                                 *old_head + *num_entries,
446                                 true,  /* may fail spuriously */
447                                 __ATOMIC_RELEASE,  /* memory order on success */
448                                 __ATOMIC_ACQUIRE);  /* memory order on fail */
449                 if (likely(success))
450                         break;
451                 rte_pause();
452         }
453
454         if (claim_func)
455                 /* Store the claim record */
456                 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
457 }
458
459 /* Input function that supports multiple threads */
460 static __rte_always_inline uint32_t
461 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
462                 uint32_t num_entries, bool block)
463 {
464         struct opdl_stage *s = input_stage(t);
465         uint32_t old_head;
466
467         move_head_atomically(s, &num_entries, &old_head, block, false);
468         if (num_entries == 0)
469                 return 0;
470
471         copy_entries_in(t, old_head, entries, num_entries);
472
473         /* If another thread started inputting before this one, but hasn't
474          * finished, we need to wait for it to complete to update the tail.
475          */
476         rte_wait_until_equal_32(&s->shared.tail, old_head, __ATOMIC_ACQUIRE);
477
478         __atomic_store_n(&s->shared.tail, old_head + num_entries,
479                         __ATOMIC_RELEASE);
480
481         return num_entries;
482 }
483
484 static __rte_always_inline uint32_t
485 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
486                 uint8_t this_lcore)
487 {
488         return ((nb_p_lcores <= 1) ? 0 :
489                         (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
490                         nb_p_lcores);
491 }
492
493 /* Claim slots to process, optimised for single-thread operation */
494 static __rte_always_inline uint32_t
495 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
496                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
497 {
498         uint32_t i = 0, j = 0,  offset;
499         uint32_t opa_id   = 0;
500         uint32_t flow_id  = 0;
501         uint64_t event    = 0;
502         void *get_slots;
503         struct rte_event *ev;
504         RTE_SET_USED(seq);
505         struct opdl_ring *t = s->t;
506         uint8_t *entries_offset = (uint8_t *)entries;
507
508         if (!atomic) {
509
510                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
511                                 s->instance_id);
512
513                 num_entries = s->nb_instance * num_entries;
514
515                 num_entries = num_to_process(s, num_entries, block);
516
517                 for (; offset < num_entries; offset += s->nb_instance) {
518                         get_slots = get_slot(t, s->head + offset);
519                         memcpy(entries_offset, get_slots, t->slot_size);
520                         entries_offset += t->slot_size;
521                         i++;
522                 }
523         } else {
524                 num_entries = num_to_process(s, num_entries, block);
525
526                 for (j = 0; j < num_entries; j++) {
527                         ev = (struct rte_event *)get_slot(t, s->head+j);
528
529                         event  = __atomic_load_n(&(ev->event),
530                                         __ATOMIC_ACQUIRE);
531
532                         opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
533                         flow_id  = OPDL_FLOWID_MASK & event;
534
535                         if (opa_id >= s->queue_id)
536                                 continue;
537
538                         if ((flow_id % s->nb_instance) == s->instance_id) {
539                                 memcpy(entries_offset, ev, t->slot_size);
540                                 entries_offset += t->slot_size;
541                                 i++;
542                         }
543                 }
544         }
545         s->shadow_head = s->head;
546         s->head += num_entries;
547         s->num_claimed = num_entries;
548         s->num_event = i;
549         s->pos = 0;
550
551         /* automatically disclaim entries if number of rte_events is zero */
552         if (unlikely(i == 0))
553                 opdl_stage_disclaim(s, 0, false);
554
555         return i;
556 }
557
558 /* Thread-safe version of function to claim slots for processing */
559 static __rte_always_inline uint32_t
560 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
561                 uint32_t num_entries, uint32_t *seq, bool block)
562 {
563         uint32_t old_head;
564         struct opdl_ring *t = s->t;
565         uint32_t i = 0, offset;
566         uint8_t *entries_offset = (uint8_t *)entries;
567
568         if (seq == NULL) {
569                 PMD_DRV_LOG(ERR, "Invalid seq PTR");
570                 return 0;
571         }
572         offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
573         num_entries = offset + (s->nb_instance * num_entries);
574
575         move_head_atomically(s, &num_entries, &old_head, block, true);
576
577         for (; offset < num_entries; offset += s->nb_instance) {
578                 memcpy(entries_offset, get_slot(t, s->head + offset),
579                         t->slot_size);
580                 entries_offset += t->slot_size;
581                 i++;
582         }
583
584         *seq = old_head;
585
586         return i;
587 }
588
589 /* Claim and copy slot pointers, optimised for single-thread operation */
590 static __rte_always_inline uint32_t
591 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
592                 uint32_t num_entries, uint32_t *seq, bool block)
593 {
594         num_entries = num_to_process(s, num_entries, block);
595         if (num_entries == 0)
596                 return 0;
597         copy_entries_out(s->t, s->head, entries, num_entries);
598         if (seq != NULL)
599                 *seq = s->head;
600         s->head += num_entries;
601         return num_entries;
602 }
603
604 /* Thread-safe version of function to claim and copy pointers to slots */
605 static __rte_always_inline uint32_t
606 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
607                 uint32_t num_entries, uint32_t *seq, bool block)
608 {
609         uint32_t old_head;
610
611         move_head_atomically(s, &num_entries, &old_head, block, true);
612         if (num_entries == 0)
613                 return 0;
614         copy_entries_out(s->t, old_head, entries, num_entries);
615         if (seq != NULL)
616                 *seq = old_head;
617         return num_entries;
618 }
619
620 static __rte_always_inline void
621 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
622                 uint32_t num_entries)
623 {
624         uint32_t old_tail = s->shared.tail;
625
626         if (unlikely(num_entries > (s->head - old_tail))) {
627                 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
628                                 num_entries, s->head - old_tail);
629                 num_entries = s->head - old_tail;
630         }
631         __atomic_store_n(&s->shared.tail, num_entries + old_tail,
632                         __ATOMIC_RELEASE);
633 }
634
635 uint32_t
636 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
637                 bool block)
638 {
639         if (input_stage(t)->threadsafe == false)
640                 return opdl_ring_input_singlethread(t, entries, num_entries,
641                                 block);
642         else
643                 return opdl_ring_input_multithread(t, entries, num_entries,
644                                 block);
645 }
646
647 uint32_t
648 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
649                 const void *entries, uint32_t num_entries, bool block)
650 {
651         uint32_t head = s->head;
652
653         num_entries = num_to_process(s, num_entries, block);
654
655         if (num_entries == 0)
656                 return 0;
657
658         copy_entries_in(t, head, entries, num_entries);
659
660         s->head += num_entries;
661         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
662
663         return num_entries;
664
665 }
666
667 uint32_t
668 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
669                 void *entries, uint32_t num_entries, bool block)
670 {
671         uint32_t head = s->head;
672
673         num_entries = num_to_process(s, num_entries, block);
674         if (num_entries == 0)
675                 return 0;
676
677         copy_entries_out(t, head, entries, num_entries);
678
679         s->head += num_entries;
680         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
681
682         return num_entries;
683 }
684
685 uint32_t
686 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
687 {
688         /* return (num_to_process(s, num_entries, false)); */
689
690         if (available(s) >= num_entries)
691                 return num_entries;
692
693         update_available_seq(s);
694
695         uint32_t avail = available(s);
696
697         if (avail == 0) {
698                 rte_pause();
699                 return 0;
700         }
701         return (avail <= num_entries) ? avail : num_entries;
702 }
703
704 uint32_t
705 opdl_stage_claim(struct opdl_stage *s, void *entries,
706                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
707 {
708         if (s->threadsafe == false)
709                 return opdl_stage_claim_singlethread(s, entries, num_entries,
710                                 seq, block, atomic);
711         else
712                 return opdl_stage_claim_multithread(s, entries, num_entries,
713                                 seq, block);
714 }
715
716 uint32_t
717 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
718                 uint32_t num_entries, uint32_t *seq, bool block)
719 {
720         if (s->threadsafe == false)
721                 return opdl_stage_claim_copy_singlethread(s, entries,
722                                 num_entries, seq, block);
723         else
724                 return opdl_stage_claim_copy_multithread(s, entries,
725                                 num_entries, seq, block);
726 }
727
728 void
729 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
730                 bool block)
731 {
732
733         if (s->threadsafe == false) {
734                 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
735         } else {
736                 struct claim_manager *disclaims =
737                         &s->pending_disclaims[rte_lcore_id()];
738
739                 if (unlikely(num_entries > s->num_slots)) {
740                         PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
741                                         num_entries, disclaims->num_claimed);
742                         num_entries = disclaims->num_claimed;
743                 }
744
745                 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
746                                 disclaims->num_claimed);
747                 opdl_stage_disclaim_multithread_n(s, num_entries, block);
748         }
749 }
750
751 int
752 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
753 {
754         if (num_entries != s->num_event) {
755                 rte_errno = EINVAL;
756                 return 0;
757         }
758         if (s->threadsafe == false) {
759                 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
760                 s->seq += s->num_claimed;
761                 s->shadow_head = s->head;
762                 s->num_claimed = 0;
763         } else {
764                 struct claim_manager *disclaims =
765                                 &s->pending_disclaims[rte_lcore_id()];
766                 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
767                                 block);
768         }
769         return num_entries;
770 }
771
772 uint32_t
773 opdl_ring_available(struct opdl_ring *t)
774 {
775         return opdl_stage_available(&t->stages[0]);
776 }
777
778 uint32_t
779 opdl_stage_available(struct opdl_stage *s)
780 {
781         update_available_seq(s);
782         return available(s);
783 }
784
785 void
786 opdl_ring_flush(struct opdl_ring *t)
787 {
788         struct opdl_stage *s = input_stage(t);
789
790         wait_for_available(s, s->num_slots);
791 }
792
793 /******************** Non performance sensitive functions ********************/
794
795 /* Initial setup of a new stage's context */
796 static int
797 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
798                 bool is_input)
799 {
800         uint32_t available = (is_input) ? t->num_slots : 0;
801
802         s->t = t;
803         s->num_slots = t->num_slots;
804         s->index = t->num_stages;
805         s->threadsafe = threadsafe;
806         s->shared.stage = s;
807
808         /* Alloc memory for deps */
809         s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
810                         t->max_num_stages * sizeof(enum dep_type),
811                         0, t->socket);
812         if (s->dep_tracking == NULL)
813                 return -ENOMEM;
814
815         s->deps = rte_zmalloc_socket(LIB_NAME,
816                         t->max_num_stages * sizeof(struct shared_state *),
817                         0, t->socket);
818         if (s->deps == NULL) {
819                 rte_free(s->dep_tracking);
820                 return -ENOMEM;
821         }
822
823         s->dep_tracking[s->index] = DEP_SELF;
824
825         if (threadsafe == true)
826                 s->shared.available_seq = available;
827         else
828                 s->available_seq = available;
829
830         return 0;
831 }
832
833 /* Add direct or indirect dependencies between stages */
834 static int
835 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
836                 enum dep_type type)
837 {
838         struct opdl_ring *t = dependent->t;
839         uint32_t i;
840
841         /* Add new direct dependency */
842         if ((type == DEP_DIRECT) &&
843                         (dependent->dep_tracking[dependency->index] ==
844                                         DEP_NONE)) {
845                 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
846                                 t->name, dependent->index, dependency->index);
847                 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
848         }
849
850         /* Add new indirect dependency or change direct to indirect */
851         if ((type == DEP_INDIRECT) &&
852                         ((dependent->dep_tracking[dependency->index] ==
853                         DEP_NONE) ||
854                         (dependent->dep_tracking[dependency->index] ==
855                         DEP_DIRECT))) {
856                 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
857                                 t->name, dependent->index, dependency->index);
858                 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
859         }
860
861         /* Shouldn't happen... */
862         if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
863                         (dependent != input_stage(t))) {
864                 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
865                                 t->name, dependent->index);
866                 return -EINVAL;
867         }
868
869         /* Keep going to dependencies of the dependency, until input stage */
870         if (dependency != input_stage(t))
871                 for (i = 0; i < dependency->num_deps; i++) {
872                         int ret = add_dep(dependent, dependency->deps[i]->stage,
873                                         DEP_INDIRECT);
874
875                         if (ret < 0)
876                                 return ret;
877                 }
878
879         /* Make list of sequence numbers for direct dependencies only */
880         if (type == DEP_DIRECT)
881                 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
882                         if (dependent->dep_tracking[i] == DEP_DIRECT) {
883                                 if ((i == 0) && (dependent->num_deps > 1))
884                                         rte_panic("%s:%u depends on > input",
885                                                         t->name,
886                                                         dependent->index);
887                                 dependent->deps[dependent->num_deps++] =
888                                                 &t->stages[i].shared;
889                         }
890
891         return 0;
892 }
893
894 struct opdl_ring *
895 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
896                 uint32_t max_num_stages, int socket)
897 {
898         struct opdl_ring *t;
899         char mz_name[RTE_MEMZONE_NAMESIZE];
900         int mz_flags = 0;
901         struct opdl_stage *st = NULL;
902         const struct rte_memzone *mz = NULL;
903         size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
904                         (num_slots * slot_size));
905
906         /* Compile time checking */
907         RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
908                         0);
909         RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
910                         RTE_CACHE_LINE_MASK) != 0);
911         RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
912                         RTE_CACHE_LINE_MASK) != 0);
913         RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
914
915         /* Parameter checking */
916         if (name == NULL) {
917                 PMD_DRV_LOG(ERR, "name param is NULL");
918                 return NULL;
919         }
920         if (!rte_is_power_of_2(num_slots)) {
921                 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
922                                 num_slots, name);
923                 return NULL;
924         }
925
926         /* Alloc memory for stages */
927         st = rte_zmalloc_socket(LIB_NAME,
928                 max_num_stages * sizeof(struct opdl_stage),
929                 RTE_CACHE_LINE_SIZE, socket);
930         if (st == NULL)
931                 goto exit_fail;
932
933         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
934
935         /* Alloc memory for memzone */
936         mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
937         if (mz == NULL)
938                 goto exit_fail;
939
940         t = mz->addr;
941
942         /* Initialise opdl_ring queue */
943         memset(t, 0, sizeof(*t));
944         strlcpy(t->name, name, sizeof(t->name));
945         t->socket = socket;
946         t->num_slots = num_slots;
947         t->mask = num_slots - 1;
948         t->slot_size = slot_size;
949         t->max_num_stages = max_num_stages;
950         t->stages = st;
951
952         PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
953                         t->name, t, num_slots, socket, slot_size);
954
955         return t;
956
957 exit_fail:
958         PMD_DRV_LOG(ERR, "Cannot reserve memory");
959         rte_free(st);
960         rte_memzone_free(mz);
961
962         return NULL;
963 }
964
965 void *
966 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
967 {
968         return get_slot(t, index);
969 }
970
971 bool
972 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
973                 uint32_t index, bool atomic)
974 {
975         uint32_t i = 0, offset;
976         struct opdl_ring *t = s->t;
977         struct rte_event *ev_orig = NULL;
978         bool ev_updated = false;
979         uint64_t ev_temp    = 0;
980         uint64_t ev_update  = 0;
981
982         uint32_t opa_id   = 0;
983         uint32_t flow_id  = 0;
984         uint64_t event    = 0;
985
986         if (index > s->num_event) {
987                 PMD_DRV_LOG(ERR, "index is overflow");
988                 return ev_updated;
989         }
990
991         ev_temp = ev->event & OPDL_EVENT_MASK;
992
993         if (!atomic) {
994                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
995                                 s->instance_id);
996                 offset += index*s->nb_instance;
997                 ev_orig = get_slot(t, s->shadow_head+offset);
998                 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
999                         ev_orig->event = ev->event;
1000                         ev_updated = true;
1001                 }
1002                 if (ev_orig->u64 != ev->u64) {
1003                         ev_orig->u64 = ev->u64;
1004                         ev_updated = true;
1005                 }
1006
1007         } else {
1008                 for (i = s->pos; i < s->num_claimed; i++) {
1009                         ev_orig = (struct rte_event *)
1010                                 get_slot(t, s->shadow_head+i);
1011
1012                         event  = __atomic_load_n(&(ev_orig->event),
1013                                         __ATOMIC_ACQUIRE);
1014
1015                         opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1016                         flow_id  = OPDL_FLOWID_MASK & event;
1017
1018                         if (opa_id >= s->queue_id)
1019                                 continue;
1020
1021                         if ((flow_id % s->nb_instance) == s->instance_id) {
1022                                 ev_update = s->queue_id;
1023                                 ev_update = (ev_update << OPDL_OPA_OFFSET)
1024                                         | ev->event;
1025
1026                                 s->pos = i + 1;
1027
1028                                 if ((event & OPDL_EVENT_MASK) !=
1029                                                 ev_temp) {
1030                                         __atomic_store_n(&(ev_orig->event),
1031                                                         ev_update,
1032                                                         __ATOMIC_RELEASE);
1033                                         ev_updated = true;
1034                                 }
1035                                 if (ev_orig->u64 != ev->u64) {
1036                                         ev_orig->u64 = ev->u64;
1037                                         ev_updated = true;
1038                                 }
1039
1040                                 break;
1041                         }
1042                 }
1043
1044         }
1045
1046         return ev_updated;
1047 }
1048
1049 int
1050 opdl_ring_get_socket(const struct opdl_ring *t)
1051 {
1052         return t->socket;
1053 }
1054
1055 uint32_t
1056 opdl_ring_get_num_slots(const struct opdl_ring *t)
1057 {
1058         return t->num_slots;
1059 }
1060
1061 const char *
1062 opdl_ring_get_name(const struct opdl_ring *t)
1063 {
1064         return t->name;
1065 }
1066
1067 /* Check dependency list is valid for a given opdl_ring */
1068 static int
1069 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1070                 uint32_t num_deps)
1071 {
1072         unsigned int i;
1073
1074         for (i = 0; i < num_deps; ++i) {
1075                 if (!deps[i]) {
1076                         PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1077                         return -EINVAL;
1078                 }
1079                 if (t != deps[i]->t) {
1080                         PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1081                                         i, deps[i]->t->name, t->name);
1082                         return -EINVAL;
1083                 }
1084         }
1085
1086         return 0;
1087 }
1088
1089 struct opdl_stage *
1090 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1091 {
1092         struct opdl_stage *s;
1093
1094         /* Parameter checking */
1095         if (!t) {
1096                 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1097                 return NULL;
1098         }
1099         if (t->num_stages == t->max_num_stages) {
1100                 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1101                                 t->name, t->max_num_stages);
1102                 return NULL;
1103         }
1104
1105         s = &t->stages[t->num_stages];
1106
1107         if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1108                 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1109                                 &s->shared, t->name);
1110
1111         if (init_stage(t, s, threadsafe, is_input) < 0) {
1112                 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1113                 return NULL;
1114         }
1115         t->num_stages++;
1116
1117         return s;
1118 }
1119
1120 uint32_t
1121 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1122                 uint32_t nb_instance, uint32_t instance_id,
1123                 struct opdl_stage *deps[],
1124                 uint32_t num_deps)
1125 {
1126         uint32_t i;
1127         int ret = 0;
1128
1129         if ((num_deps > 0) && (!deps)) {
1130                 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1131                 return -1;
1132         }
1133         ret = check_deps(t, deps, num_deps);
1134         if (ret < 0)
1135                 return ret;
1136
1137         for (i = 0; i < num_deps; i++) {
1138                 ret = add_dep(s, deps[i], DEP_DIRECT);
1139                 if (ret < 0)
1140                         return ret;
1141         }
1142
1143         s->nb_instance = nb_instance;
1144         s->instance_id = instance_id;
1145
1146         return ret;
1147 }
1148
1149 struct opdl_stage *
1150 opdl_ring_get_input_stage(const struct opdl_ring *t)
1151 {
1152         return input_stage(t);
1153 }
1154
1155 int
1156 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1157                 uint32_t num_deps)
1158 {
1159         unsigned int i;
1160         int ret;
1161
1162         if ((num_deps == 0) || (!deps)) {
1163                 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1164                 return -EINVAL;
1165         }
1166
1167         ret = check_deps(s->t, deps, num_deps);
1168         if (ret < 0)
1169                 return ret;
1170
1171         /* Update deps */
1172         for (i = 0; i < num_deps; i++)
1173                 s->deps[i] = &deps[i]->shared;
1174         s->num_deps = num_deps;
1175
1176         return 0;
1177 }
1178
1179 struct opdl_ring *
1180 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1181 {
1182         return s->t;
1183 }
1184
1185 void
1186 opdl_stage_set_queue_id(struct opdl_stage *s,
1187                 uint32_t queue_id)
1188 {
1189         s->queue_id = queue_id;
1190 }
1191
1192 void
1193 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1194 {
1195         uint32_t i;
1196
1197         if (t == NULL) {
1198                 fprintf(f, "NULL OPDL!\n");
1199                 return;
1200         }
1201         fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1202                         t->name, t->num_slots, t->mask, t->slot_size,
1203                         t->num_stages, t->socket);
1204         for (i = 0; i < t->num_stages; i++) {
1205                 uint32_t j;
1206                 const struct opdl_stage *s = &t->stages[i];
1207
1208                 fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1209                                 t->name, i, (s->threadsafe) ? "true" : "false",
1210                                 (s->threadsafe) ? s->shared.head : s->head,
1211                                 (s->threadsafe) ? s->shared.available_seq :
1212                                 s->available_seq,
1213                                 s->shared.tail, (s->num_deps > 0) ?
1214                                 s->deps[0]->stage->index : 0);
1215                 for (j = 1; j < s->num_deps; j++)
1216                         fprintf(f, ",%u", s->deps[j]->stage->index);
1217                 fprintf(f, "\n");
1218         }
1219         fflush(f);
1220 }
1221
1222 void
1223 opdl_ring_free(struct opdl_ring *t)
1224 {
1225         uint32_t i;
1226         const struct rte_memzone *mz;
1227         char mz_name[RTE_MEMZONE_NAMESIZE];
1228
1229         if (t == NULL) {
1230                 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1231                 return;
1232         }
1233
1234         PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1235
1236         for (i = 0; i < t->num_stages; ++i) {
1237                 rte_free(t->stages[i].deps);
1238                 rte_free(t->stages[i].dep_tracking);
1239         }
1240
1241         rte_free(t->stages);
1242
1243         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1244         mz = rte_memzone_lookup(mz_name);
1245         if (rte_memzone_free(mz) != 0)
1246                 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1247 }
1248
1249 /* search a opdl_ring from its name */
1250 struct opdl_ring *
1251 opdl_ring_lookup(const char *name)
1252 {
1253         const struct rte_memzone *mz;
1254         char mz_name[RTE_MEMZONE_NAMESIZE];
1255
1256         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1257
1258         mz = rte_memzone_lookup(mz_name);
1259         if (mz == NULL)
1260                 return NULL;
1261
1262         return mz->addr;
1263 }
1264
1265 void
1266 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1267 {
1268         s->threadsafe = threadsafe;
1269 }