1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
10 #include <rte_branch_prediction.h>
11 #include <rte_debug.h>
12 #include <rte_lcore.h>
14 #include <rte_malloc.h>
15 #include <rte_memcpy.h>
16 #include <rte_memory.h>
17 #include <rte_memzone.h>
18 #include <rte_eal_memconfig.h>
20 #include "opdl_ring.h"
23 #define LIB_NAME "opdl_ring"
25 #define OPDL_NAME_SIZE 64
28 #define OPDL_EVENT_MASK (0x00000000000FFFFFULL)
29 #define OPDL_FLOWID_MASK (0xFFFFF)
30 #define OPDL_OPA_MASK (0xFF)
31 #define OPDL_OPA_OFFSET (0x38)
33 int opdl_logtype_driver;
35 /* Types of dependency between stages */
37 DEP_NONE = 0, /* no dependency */
38 DEP_DIRECT, /* stage has direct dependency */
39 DEP_INDIRECT, /* in-direct dependency through other stage(s) */
40 DEP_SELF, /* stage dependency on itself, used to detect loops */
43 /* Shared section of stage state.
44 * Care is needed when accessing and the layout is important, especially to
45 * limit the adjacent cache-line HW prefetcher from impacting performance.
48 /* Last known minimum sequence number of dependencies, used for multi
51 uint32_t available_seq;
52 char _pad1[RTE_CACHE_LINE_SIZE * 3];
53 uint32_t head; /* Head sequence number (for multi thread operation) */
54 char _pad2[RTE_CACHE_LINE_SIZE * 3];
55 struct opdl_stage *stage; /* back pointer */
56 uint32_t tail; /* Tail sequence number */
57 char _pad3[RTE_CACHE_LINE_SIZE * 2];
58 } __rte_cache_aligned;
60 /* A structure to keep track of "unfinished" claims. This is only used for
61 * stages that are threadsafe. Each lcore accesses its own instance of this
62 * structure to record the entries it has claimed. This allows one lcore to make
63 * multiple claims without being blocked by another. When disclaiming it moves
64 * forward the shared tail when the shared tail matches the tail value recorded
67 struct claim_manager {
68 uint32_t num_to_disclaim;
75 } claims[OPDL_DISCLAIMS_PER_LCORE];
76 } __rte_cache_aligned;
78 /* Context for each stage of opdl_ring.
79 * Calculations on sequence numbers need to be done with other uint32_t values
80 * so that results are modulus 2^32, and not undefined.
83 struct opdl_ring *t; /* back pointer, set at init */
84 uint32_t num_slots; /* Number of slots for entries, set at init */
85 uint32_t index; /* ID for this stage, set at init */
86 bool threadsafe; /* Set to 1 if this stage supports threadsafe use */
87 /* Last known min seq number of dependencies for used for single thread
90 uint32_t available_seq;
91 uint32_t head; /* Current head for single-thread operation */
92 uint32_t nb_instance; /* Number of instances */
93 uint32_t instance_id; /* ID of this stage instance */
94 uint16_t num_claimed; /* Number of slots claimed */
95 uint16_t num_event; /* Number of events */
96 uint32_t seq; /* sequence number */
97 uint32_t num_deps; /* Number of direct dependencies */
98 /* Keep track of all dependencies, used during init only */
99 enum dep_type *dep_tracking;
100 /* Direct dependencies of this stage */
101 struct shared_state **deps;
102 /* Other stages read this! */
103 struct shared_state shared __rte_cache_aligned;
104 /* For managing disclaims in multi-threaded processing stages */
105 struct claim_manager pending_disclaims[RTE_MAX_LCORE]
107 uint32_t shadow_head; /* Shadow head for single-thread operation */
108 uint32_t queue_id; /* ID of Queue which is assigned to this stage */
109 uint32_t pos; /* Atomic scan position */
110 } __rte_cache_aligned;
112 /* Context for opdl_ring */
114 char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */
115 int socket; /* NUMA socket that memory is allocated on */
116 uint32_t num_slots; /* Number of slots for entries */
117 uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */
118 uint32_t slot_size; /* Size of each slot in bytes */
119 uint32_t num_stages; /* Number of stages that have been added */
120 uint32_t max_num_stages; /* Max number of stages */
121 /* Stages indexed by ID */
122 struct opdl_stage *stages;
123 /* Memory for storing slot data */
124 uint8_t slots[0] __rte_cache_aligned;
128 /* Return input stage of a opdl_ring */
129 static __rte_always_inline struct opdl_stage *
130 input_stage(const struct opdl_ring *t)
132 return &t->stages[0];
135 /* Check if a stage is the input stage */
136 static __rte_always_inline bool
137 is_input_stage(const struct opdl_stage *s)
139 return s->index == 0;
142 /* Get slot pointer from sequence number */
143 static __rte_always_inline void *
144 get_slot(const struct opdl_ring *t, uint32_t n)
146 return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
149 /* Find how many entries are available for processing */
150 static __rte_always_inline uint32_t
151 available(const struct opdl_stage *s)
153 if (s->threadsafe == true) {
154 uint32_t n = __atomic_load_n(&s->shared.available_seq,
156 __atomic_load_n(&s->shared.head,
159 /* Return 0 if available_seq needs to be updated */
160 return (n <= s->num_slots) ? n : 0;
163 /* Single threaded */
164 return s->available_seq - s->head;
167 /* Read sequence number of dependencies and find minimum */
168 static __rte_always_inline void
169 update_available_seq(struct opdl_stage *s)
172 uint32_t this_tail = s->shared.tail;
173 uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
174 /* Input stage sequence numbers are greater than the sequence numbers of
175 * its dependencies so an offset of t->num_slots is needed when
176 * calculating available slots and also the condition which is used to
177 * determine the dependencies minimum sequence number must be reverted.
181 if (is_input_stage(s)) {
183 for (i = 1; i < s->num_deps; i++) {
184 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
186 if ((this_tail - seq) > (this_tail - min_seq))
191 for (i = 1; i < s->num_deps; i++) {
192 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
194 if ((seq - this_tail) < (min_seq - this_tail))
199 if (s->threadsafe == false)
200 s->available_seq = min_seq + wrap;
202 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
206 /* Wait until the number of available slots reaches number requested */
207 static __rte_always_inline void
208 wait_for_available(struct opdl_stage *s, uint32_t n)
210 while (available(s) < n) {
212 update_available_seq(s);
216 /* Return number of slots to process based on number requested and mode */
217 static __rte_always_inline uint32_t
218 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
220 /* Don't read tail sequences of dependencies if not needed */
221 if (available(s) >= n)
224 update_available_seq(s);
226 if (block == false) {
227 uint32_t avail = available(s);
233 return (avail <= n) ? avail : n;
236 if (unlikely(n > s->num_slots)) {
237 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
239 return 0; /* Avoid infinite loop */
242 wait_for_available(s, n);
246 /* Copy entries in to slots with wrap-around */
247 static __rte_always_inline void
248 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
249 uint32_t num_entries)
251 uint32_t slot_size = t->slot_size;
252 uint32_t slot_index = start & t->mask;
254 if (slot_index + num_entries <= t->num_slots) {
255 rte_memcpy(get_slot(t, start), entries,
256 num_entries * slot_size);
258 uint32_t split = t->num_slots - slot_index;
260 rte_memcpy(get_slot(t, start), entries, split * slot_size);
261 rte_memcpy(get_slot(t, 0),
262 RTE_PTR_ADD(entries, split * slot_size),
263 (num_entries - split) * slot_size);
267 /* Copy entries out from slots with wrap-around */
268 static __rte_always_inline void
269 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
270 uint32_t num_entries)
272 uint32_t slot_size = t->slot_size;
273 uint32_t slot_index = start & t->mask;
275 if (slot_index + num_entries <= t->num_slots) {
276 rte_memcpy(entries, get_slot(t, start),
277 num_entries * slot_size);
279 uint32_t split = t->num_slots - slot_index;
281 rte_memcpy(entries, get_slot(t, start), split * slot_size);
282 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
284 (num_entries - split) * slot_size);
288 /* Input function optimised for single thread */
289 static __rte_always_inline uint32_t
290 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
291 uint32_t num_entries, bool block)
293 struct opdl_stage *s = input_stage(t);
294 uint32_t head = s->head;
296 num_entries = num_to_process(s, num_entries, block);
297 if (num_entries == 0)
300 copy_entries_in(t, head, entries, num_entries);
302 s->head += num_entries;
303 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
308 /* Convert head and tail of claim_manager into valid index */
309 static __rte_always_inline uint32_t
310 claim_mgr_index(uint32_t n)
312 return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
315 /* Check if there are available slots in claim_manager */
316 static __rte_always_inline bool
317 claim_mgr_available(struct claim_manager *mgr)
319 return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
323 /* Record a new claim. Only use after first checking an entry is available */
324 static __rte_always_inline void
325 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
327 if ((mgr->mgr_head != mgr->mgr_tail) &&
328 (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
330 /* Combine with previous claim */
331 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
333 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
334 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
338 mgr->num_claimed += (head - tail);
341 /* Read the oldest recorded claim */
342 static __rte_always_inline bool
343 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
345 if (mgr->mgr_head == mgr->mgr_tail)
348 *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
349 *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
353 /* Remove the oldest recorded claim. Only use after first reading the entry */
354 static __rte_always_inline void
355 claim_mgr_remove(struct claim_manager *mgr)
357 mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
358 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
362 /* Update tail in the oldest claim. Only use after first reading the entry */
363 static __rte_always_inline void
364 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
366 mgr->num_claimed -= num_entries;
367 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
370 static __rte_always_inline void
371 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
372 uint32_t num_entries, bool block)
374 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
378 while (num_entries) {
379 bool ret = claim_mgr_read(disclaims, &tail, &head);
382 break; /* nothing is claimed */
383 /* There should be no race condition here. If shared.tail
384 * matches, no other core can update it until this one does.
386 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
388 if (num_entries >= (head - tail)) {
389 claim_mgr_remove(disclaims);
390 __atomic_store_n(&s->shared.tail, head,
392 num_entries -= (head - tail);
394 claim_mgr_move_tail(disclaims, num_entries);
395 __atomic_store_n(&s->shared.tail,
400 } else if (block == false)
401 break; /* blocked by other thread */
402 /* Keep going until num_entries are disclaimed. */
406 disclaims->num_to_disclaim = num_entries;
409 /* Move head atomically, returning number of entries available to process and
410 * the original value of head. For non-input stages, the claim is recorded
411 * so that the tail can be updated later by opdl_stage_disclaim().
413 static __rte_always_inline void
414 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
415 uint32_t *old_head, bool block, bool claim_func)
417 uint32_t orig_num_entries = *num_entries;
419 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
421 /* Attempt to disclaim any outstanding claims */
422 opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
425 *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
428 /* If called by opdl_ring_input(), claim does not need to be
429 * recorded, as there will be no disclaim.
432 /* Check that the claim can be recorded */
433 ret = claim_mgr_available(disclaims);
435 /* exit out if claim can't be recorded */
441 *num_entries = num_to_process(s, orig_num_entries, block);
442 if (*num_entries == 0)
445 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
446 *old_head + *num_entries,
447 true, /* may fail spuriously */
448 __ATOMIC_RELEASE, /* memory order on success */
449 __ATOMIC_ACQUIRE); /* memory order on fail */
456 /* Store the claim record */
457 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
460 /* Input function that supports multiple threads */
461 static __rte_always_inline uint32_t
462 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
463 uint32_t num_entries, bool block)
465 struct opdl_stage *s = input_stage(t);
468 move_head_atomically(s, &num_entries, &old_head, block, false);
469 if (num_entries == 0)
472 copy_entries_in(t, old_head, entries, num_entries);
474 /* If another thread started inputting before this one, but hasn't
475 * finished, we need to wait for it to complete to update the tail.
477 while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
481 __atomic_store_n(&s->shared.tail, old_head + num_entries,
487 static __rte_always_inline uint32_t
488 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
491 return ((nb_p_lcores <= 1) ? 0 :
492 (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
496 /* Claim slots to process, optimised for single-thread operation */
497 static __rte_always_inline uint32_t
498 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
499 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
501 uint32_t i = 0, j = 0, offset;
503 uint32_t flow_id = 0;
506 struct rte_event *ev;
508 struct opdl_ring *t = s->t;
509 uint8_t *entries_offset = (uint8_t *)entries;
513 offset = opdl_first_entry_id(s->seq, s->nb_instance,
516 num_entries = s->nb_instance * num_entries;
518 num_entries = num_to_process(s, num_entries, block);
520 for (; offset < num_entries; offset += s->nb_instance) {
521 get_slots = get_slot(t, s->head + offset);
522 memcpy(entries_offset, get_slots, t->slot_size);
523 entries_offset += t->slot_size;
527 num_entries = num_to_process(s, num_entries, block);
529 for (j = 0; j < num_entries; j++) {
530 ev = (struct rte_event *)get_slot(t, s->head+j);
532 event = __atomic_load_n(&(ev->event),
535 opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
536 flow_id = OPDL_FLOWID_MASK & event;
538 if (opa_id >= s->queue_id)
541 if ((flow_id % s->nb_instance) == s->instance_id) {
542 memcpy(entries_offset, ev, t->slot_size);
543 entries_offset += t->slot_size;
548 s->shadow_head = s->head;
549 s->head += num_entries;
550 s->num_claimed = num_entries;
554 /* automatically disclaim entries if number of rte_events is zero */
555 if (unlikely(i == 0))
556 opdl_stage_disclaim(s, 0, false);
561 /* Thread-safe version of function to claim slots for processing */
562 static __rte_always_inline uint32_t
563 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
564 uint32_t num_entries, uint32_t *seq, bool block)
567 struct opdl_ring *t = s->t;
568 uint32_t i = 0, offset;
569 uint8_t *entries_offset = (uint8_t *)entries;
572 PMD_DRV_LOG(ERR, "Invalid seq PTR");
575 offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
576 num_entries = offset + (s->nb_instance * num_entries);
578 move_head_atomically(s, &num_entries, &old_head, block, true);
580 for (; offset < num_entries; offset += s->nb_instance) {
581 memcpy(entries_offset, get_slot(t, s->head + offset),
583 entries_offset += t->slot_size;
592 /* Claim and copy slot pointers, optimised for single-thread operation */
593 static __rte_always_inline uint32_t
594 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
595 uint32_t num_entries, uint32_t *seq, bool block)
597 num_entries = num_to_process(s, num_entries, block);
598 if (num_entries == 0)
600 copy_entries_out(s->t, s->head, entries, num_entries);
603 s->head += num_entries;
607 /* Thread-safe version of function to claim and copy pointers to slots */
608 static __rte_always_inline uint32_t
609 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
610 uint32_t num_entries, uint32_t *seq, bool block)
614 move_head_atomically(s, &num_entries, &old_head, block, true);
615 if (num_entries == 0)
617 copy_entries_out(s->t, old_head, entries, num_entries);
623 static __rte_always_inline void
624 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
625 uint32_t num_entries)
627 uint32_t old_tail = s->shared.tail;
629 if (unlikely(num_entries > (s->head - old_tail))) {
630 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
631 num_entries, s->head - old_tail);
632 num_entries = s->head - old_tail;
634 __atomic_store_n(&s->shared.tail, num_entries + old_tail,
639 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
642 if (input_stage(t)->threadsafe == false)
643 return opdl_ring_input_singlethread(t, entries, num_entries,
646 return opdl_ring_input_multithread(t, entries, num_entries,
651 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
652 const void *entries, uint32_t num_entries, bool block)
654 uint32_t head = s->head;
656 num_entries = num_to_process(s, num_entries, block);
658 if (num_entries == 0)
661 copy_entries_in(t, head, entries, num_entries);
663 s->head += num_entries;
664 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
671 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
672 void *entries, uint32_t num_entries, bool block)
674 uint32_t head = s->head;
676 num_entries = num_to_process(s, num_entries, block);
677 if (num_entries == 0)
680 copy_entries_out(t, head, entries, num_entries);
682 s->head += num_entries;
683 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
689 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
691 /* return (num_to_process(s, num_entries, false)); */
693 if (available(s) >= num_entries)
696 update_available_seq(s);
698 uint32_t avail = available(s);
704 return (avail <= num_entries) ? avail : num_entries;
708 opdl_stage_claim(struct opdl_stage *s, void *entries,
709 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
711 if (s->threadsafe == false)
712 return opdl_stage_claim_singlethread(s, entries, num_entries,
715 return opdl_stage_claim_multithread(s, entries, num_entries,
720 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
721 uint32_t num_entries, uint32_t *seq, bool block)
723 if (s->threadsafe == false)
724 return opdl_stage_claim_copy_singlethread(s, entries,
725 num_entries, seq, block);
727 return opdl_stage_claim_copy_multithread(s, entries,
728 num_entries, seq, block);
732 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
736 if (s->threadsafe == false) {
737 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
739 struct claim_manager *disclaims =
740 &s->pending_disclaims[rte_lcore_id()];
742 if (unlikely(num_entries > s->num_slots)) {
743 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
744 num_entries, disclaims->num_claimed);
745 num_entries = disclaims->num_claimed;
748 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
749 disclaims->num_claimed);
750 opdl_stage_disclaim_multithread_n(s, num_entries, block);
755 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
757 if (num_entries != s->num_event) {
761 if (s->threadsafe == false) {
762 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
763 s->seq += s->num_claimed;
764 s->shadow_head = s->head;
767 struct claim_manager *disclaims =
768 &s->pending_disclaims[rte_lcore_id()];
769 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
776 opdl_ring_available(struct opdl_ring *t)
778 return opdl_stage_available(&t->stages[0]);
782 opdl_stage_available(struct opdl_stage *s)
784 update_available_seq(s);
789 opdl_ring_flush(struct opdl_ring *t)
791 struct opdl_stage *s = input_stage(t);
793 wait_for_available(s, s->num_slots);
796 /******************** Non performance sensitive functions ********************/
798 /* Initial setup of a new stage's context */
800 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
803 uint32_t available = (is_input) ? t->num_slots : 0;
806 s->num_slots = t->num_slots;
807 s->index = t->num_stages;
808 s->threadsafe = threadsafe;
811 /* Alloc memory for deps */
812 s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
813 t->max_num_stages * sizeof(enum dep_type),
815 if (s->dep_tracking == NULL)
818 s->deps = rte_zmalloc_socket(LIB_NAME,
819 t->max_num_stages * sizeof(struct shared_state *),
821 if (s->deps == NULL) {
822 rte_free(s->dep_tracking);
826 s->dep_tracking[s->index] = DEP_SELF;
828 if (threadsafe == true)
829 s->shared.available_seq = available;
831 s->available_seq = available;
836 /* Add direct or indirect dependencies between stages */
838 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
841 struct opdl_ring *t = dependent->t;
844 /* Add new direct dependency */
845 if ((type == DEP_DIRECT) &&
846 (dependent->dep_tracking[dependency->index] ==
848 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
849 t->name, dependent->index, dependency->index);
850 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
853 /* Add new indirect dependency or change direct to indirect */
854 if ((type == DEP_INDIRECT) &&
855 ((dependent->dep_tracking[dependency->index] ==
857 (dependent->dep_tracking[dependency->index] ==
859 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
860 t->name, dependent->index, dependency->index);
861 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
864 /* Shouldn't happen... */
865 if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
866 (dependent != input_stage(t))) {
867 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
868 t->name, dependent->index);
872 /* Keep going to dependencies of the dependency, until input stage */
873 if (dependency != input_stage(t))
874 for (i = 0; i < dependency->num_deps; i++) {
875 int ret = add_dep(dependent, dependency->deps[i]->stage,
882 /* Make list of sequence numbers for direct dependencies only */
883 if (type == DEP_DIRECT)
884 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
885 if (dependent->dep_tracking[i] == DEP_DIRECT) {
886 if ((i == 0) && (dependent->num_deps > 1))
887 rte_panic("%s:%u depends on > input",
890 dependent->deps[dependent->num_deps++] =
891 &t->stages[i].shared;
898 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
899 uint32_t max_num_stages, int socket)
902 char mz_name[RTE_MEMZONE_NAMESIZE];
904 struct opdl_stage *st = NULL;
905 const struct rte_memzone *mz = NULL;
906 size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
907 (num_slots * slot_size));
909 /* Compile time checking */
910 RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
912 RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
913 RTE_CACHE_LINE_MASK) != 0);
914 RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
915 RTE_CACHE_LINE_MASK) != 0);
916 RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
918 /* Parameter checking */
920 PMD_DRV_LOG(ERR, "name param is NULL");
923 if (!rte_is_power_of_2(num_slots)) {
924 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
929 /* Alloc memory for stages */
930 st = rte_zmalloc_socket(LIB_NAME,
931 max_num_stages * sizeof(struct opdl_stage),
932 RTE_CACHE_LINE_SIZE, socket);
936 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
938 /* Alloc memory for memzone */
939 mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
945 /* Initialise opdl_ring queue */
946 memset(t, 0, sizeof(*t));
947 snprintf(t->name, sizeof(t->name), "%s", name);
949 t->num_slots = num_slots;
950 t->mask = num_slots - 1;
951 t->slot_size = slot_size;
952 t->max_num_stages = max_num_stages;
955 PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
956 t->name, t, num_slots, socket, slot_size);
961 PMD_DRV_LOG(ERR, "Cannot reserve memory");
963 rte_memzone_free(mz);
969 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
971 return get_slot(t, index);
975 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
976 uint32_t index, bool atomic)
978 uint32_t i = 0, offset;
979 struct opdl_ring *t = s->t;
980 struct rte_event *ev_orig = NULL;
981 bool ev_updated = false;
982 uint64_t ev_temp = 0;
983 uint64_t ev_update = 0;
986 uint32_t flow_id = 0;
989 if (index > s->num_event) {
990 PMD_DRV_LOG(ERR, "index is overflow");
994 ev_temp = ev->event & OPDL_EVENT_MASK;
997 offset = opdl_first_entry_id(s->seq, s->nb_instance,
999 offset += index*s->nb_instance;
1000 ev_orig = get_slot(t, s->shadow_head+offset);
1001 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
1002 ev_orig->event = ev->event;
1005 if (ev_orig->u64 != ev->u64) {
1006 ev_orig->u64 = ev->u64;
1011 for (i = s->pos; i < s->num_claimed; i++) {
1012 ev_orig = (struct rte_event *)
1013 get_slot(t, s->shadow_head+i);
1015 event = __atomic_load_n(&(ev_orig->event),
1018 opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1019 flow_id = OPDL_FLOWID_MASK & event;
1021 if (opa_id >= s->queue_id)
1024 if ((flow_id % s->nb_instance) == s->instance_id) {
1025 ev_update = s->queue_id;
1026 ev_update = (ev_update << OPDL_OPA_OFFSET)
1031 if ((event & OPDL_EVENT_MASK) !=
1033 __atomic_store_n(&(ev_orig->event),
1038 if (ev_orig->u64 != ev->u64) {
1039 ev_orig->u64 = ev->u64;
1053 opdl_ring_get_socket(const struct opdl_ring *t)
1059 opdl_ring_get_num_slots(const struct opdl_ring *t)
1061 return t->num_slots;
1065 opdl_ring_get_name(const struct opdl_ring *t)
1070 /* Check dependency list is valid for a given opdl_ring */
1072 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1077 for (i = 0; i < num_deps; ++i) {
1079 PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1082 if (t != deps[i]->t) {
1083 PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1084 i, deps[i]->t->name, t->name);
1093 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1095 struct opdl_stage *s;
1097 /* Parameter checking */
1099 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1102 if (t->num_stages == t->max_num_stages) {
1103 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1104 t->name, t->max_num_stages);
1108 s = &t->stages[t->num_stages];
1110 if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1111 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1112 &s->shared, t->name);
1114 if (init_stage(t, s, threadsafe, is_input) < 0) {
1115 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1124 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1125 uint32_t nb_instance, uint32_t instance_id,
1126 struct opdl_stage *deps[],
1132 if ((num_deps > 0) && (!deps)) {
1133 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1136 ret = check_deps(t, deps, num_deps);
1140 for (i = 0; i < num_deps; i++) {
1141 ret = add_dep(s, deps[i], DEP_DIRECT);
1146 s->nb_instance = nb_instance;
1147 s->instance_id = instance_id;
1153 opdl_ring_get_input_stage(const struct opdl_ring *t)
1155 return input_stage(t);
1159 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1165 if ((num_deps == 0) || (!deps)) {
1166 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1170 ret = check_deps(s->t, deps, num_deps);
1175 for (i = 0; i < num_deps; i++)
1176 s->deps[i] = &deps[i]->shared;
1177 s->num_deps = num_deps;
1183 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1189 opdl_stage_set_queue_id(struct opdl_stage *s,
1192 s->queue_id = queue_id;
1196 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1201 fprintf(f, "NULL OPDL!\n");
1204 fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1205 t->name, t->num_slots, t->mask, t->slot_size,
1206 t->num_stages, t->socket);
1207 for (i = 0; i < t->num_stages; i++) {
1209 const struct opdl_stage *s = &t->stages[i];
1211 fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1212 t->name, i, (s->threadsafe) ? "true" : "false",
1213 (s->threadsafe) ? s->shared.head : s->head,
1214 (s->threadsafe) ? s->shared.available_seq :
1216 s->shared.tail, (s->num_deps > 0) ?
1217 s->deps[0]->stage->index : 0);
1218 for (j = 1; j < s->num_deps; j++)
1219 fprintf(f, ",%u", s->deps[j]->stage->index);
1226 opdl_ring_free(struct opdl_ring *t)
1229 const struct rte_memzone *mz;
1230 char mz_name[RTE_MEMZONE_NAMESIZE];
1233 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1237 PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1239 for (i = 0; i < t->num_stages; ++i) {
1240 rte_free(t->stages[i].deps);
1241 rte_free(t->stages[i].dep_tracking);
1244 rte_free(t->stages);
1246 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1247 mz = rte_memzone_lookup(mz_name);
1248 if (rte_memzone_free(mz) != 0)
1249 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1252 /* search a opdl_ring from its name */
1254 opdl_ring_lookup(const char *name)
1256 const struct rte_memzone *mz;
1257 char mz_name[RTE_MEMZONE_NAMESIZE];
1259 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1261 mz = rte_memzone_lookup(mz_name);
1269 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1271 s->threadsafe = threadsafe;