1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
10 #include <rte_branch_prediction.h>
11 #include <rte_debug.h>
12 #include <rte_lcore.h>
14 #include <rte_malloc.h>
15 #include <rte_memcpy.h>
16 #include <rte_memory.h>
17 #include <rte_memzone.h>
18 #include <rte_eal_memconfig.h>
20 #include "opdl_ring.h"
23 #define LIB_NAME "opdl_ring"
25 #define OPDL_NAME_SIZE 64
28 #define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL)
30 int opdl_logtype_driver;
32 /* Types of dependency between stages */
34 DEP_NONE = 0, /* no dependency */
35 DEP_DIRECT, /* stage has direct dependency */
36 DEP_INDIRECT, /* in-direct dependency through other stage(s) */
37 DEP_SELF, /* stage dependency on itself, used to detect loops */
40 /* Shared section of stage state.
41 * Care is needed when accessing and the layout is important, especially to
42 * limit the adjacent cache-line HW prefetcher from impacting performance.
45 /* Last known minimum sequence number of dependencies, used for multi
48 uint32_t available_seq;
49 char _pad1[RTE_CACHE_LINE_SIZE * 3];
50 uint32_t head; /* Head sequence number (for multi thread operation) */
51 char _pad2[RTE_CACHE_LINE_SIZE * 3];
52 struct opdl_stage *stage; /* back pointer */
53 uint32_t tail; /* Tail sequence number */
54 char _pad3[RTE_CACHE_LINE_SIZE * 2];
55 } __rte_cache_aligned;
57 /* A structure to keep track of "unfinished" claims. This is only used for
58 * stages that are threadsafe. Each lcore accesses its own instance of this
59 * structure to record the entries it has claimed. This allows one lcore to make
60 * multiple claims without being blocked by another. When disclaiming it moves
61 * forward the shared tail when the shared tail matches the tail value recorded
64 struct claim_manager {
65 uint32_t num_to_disclaim;
72 } claims[OPDL_DISCLAIMS_PER_LCORE];
73 } __rte_cache_aligned;
75 /* Context for each stage of opdl_ring.
76 * Calculations on sequence numbers need to be done with other uint32_t values
77 * so that results are modulus 2^32, and not undefined.
80 struct opdl_ring *t; /* back pointer, set at init */
81 uint32_t num_slots; /* Number of slots for entries, set at init */
82 uint32_t index; /* ID for this stage, set at init */
83 bool threadsafe; /* Set to 1 if this stage supports threadsafe use */
84 /* Last known min seq number of dependencies for used for single thread
87 uint32_t available_seq;
88 uint32_t head; /* Current head for single-thread operation */
89 uint32_t shadow_head; /* Shadow head for single-thread operation */
90 uint32_t nb_instance; /* Number of instances */
91 uint32_t instance_id; /* ID of this stage instance */
92 uint16_t num_claimed; /* Number of slots claimed */
93 uint16_t num_event; /* Number of events */
94 uint32_t seq; /* sequence number */
95 uint32_t num_deps; /* Number of direct dependencies */
96 /* Keep track of all dependencies, used during init only */
97 enum dep_type *dep_tracking;
98 /* Direct dependencies of this stage */
99 struct shared_state **deps;
100 /* Other stages read this! */
101 struct shared_state shared __rte_cache_aligned;
102 /* For managing disclaims in multi-threaded processing stages */
103 struct claim_manager pending_disclaims[RTE_MAX_LCORE]
105 } __rte_cache_aligned;
107 /* Context for opdl_ring */
109 char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */
110 int socket; /* NUMA socket that memory is allocated on */
111 uint32_t num_slots; /* Number of slots for entries */
112 uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */
113 uint32_t slot_size; /* Size of each slot in bytes */
114 uint32_t num_stages; /* Number of stages that have been added */
115 uint32_t max_num_stages; /* Max number of stages */
116 /* Stages indexed by ID */
117 struct opdl_stage *stages;
118 /* Memory for storing slot data */
119 uint8_t slots[0] __rte_cache_aligned;
123 /* Return input stage of a opdl_ring */
124 static __rte_always_inline struct opdl_stage *
125 input_stage(const struct opdl_ring *t)
127 return &t->stages[0];
130 /* Check if a stage is the input stage */
131 static __rte_always_inline bool
132 is_input_stage(const struct opdl_stage *s)
134 return s->index == 0;
137 /* Get slot pointer from sequence number */
138 static __rte_always_inline void *
139 get_slot(const struct opdl_ring *t, uint32_t n)
141 return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
144 /* Find how many entries are available for processing */
145 static __rte_always_inline uint32_t
146 available(const struct opdl_stage *s)
148 if (s->threadsafe == true) {
149 uint32_t n = __atomic_load_n(&s->shared.available_seq,
151 __atomic_load_n(&s->shared.head,
154 /* Return 0 if available_seq needs to be updated */
155 return (n <= s->num_slots) ? n : 0;
158 /* Single threaded */
159 return s->available_seq - s->head;
162 /* Read sequence number of dependencies and find minimum */
163 static __rte_always_inline void
164 update_available_seq(struct opdl_stage *s)
167 uint32_t this_tail = s->shared.tail;
168 uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
169 /* Input stage sequence numbers are greater than the sequence numbers of
170 * its dependencies so an offset of t->num_slots is needed when
171 * calculating available slots and also the condition which is used to
172 * determine the dependencies minimum sequence number must be reverted.
176 if (is_input_stage(s)) {
178 for (i = 1; i < s->num_deps; i++) {
179 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
181 if ((this_tail - seq) > (this_tail - min_seq))
186 for (i = 1; i < s->num_deps; i++) {
187 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
189 if ((seq - this_tail) < (min_seq - this_tail))
194 if (s->threadsafe == false)
195 s->available_seq = min_seq + wrap;
197 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
201 /* Wait until the number of available slots reaches number requested */
202 static __rte_always_inline void
203 wait_for_available(struct opdl_stage *s, uint32_t n)
205 while (available(s) < n) {
207 update_available_seq(s);
211 /* Return number of slots to process based on number requested and mode */
212 static __rte_always_inline uint32_t
213 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
215 /* Don't read tail sequences of dependencies if not needed */
216 if (available(s) >= n)
219 update_available_seq(s);
221 if (block == false) {
222 uint32_t avail = available(s);
228 return (avail <= n) ? avail : n;
231 if (unlikely(n > s->num_slots)) {
232 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
234 return 0; /* Avoid infinite loop */
237 wait_for_available(s, n);
241 /* Copy entries in to slots with wrap-around */
242 static __rte_always_inline void
243 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
244 uint32_t num_entries)
246 uint32_t slot_size = t->slot_size;
247 uint32_t slot_index = start & t->mask;
249 if (slot_index + num_entries <= t->num_slots) {
250 rte_memcpy(get_slot(t, start), entries,
251 num_entries * slot_size);
253 uint32_t split = t->num_slots - slot_index;
255 rte_memcpy(get_slot(t, start), entries, split * slot_size);
256 rte_memcpy(get_slot(t, 0),
257 RTE_PTR_ADD(entries, split * slot_size),
258 (num_entries - split) * slot_size);
262 /* Copy entries out from slots with wrap-around */
263 static __rte_always_inline void
264 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
265 uint32_t num_entries)
267 uint32_t slot_size = t->slot_size;
268 uint32_t slot_index = start & t->mask;
270 if (slot_index + num_entries <= t->num_slots) {
271 rte_memcpy(entries, get_slot(t, start),
272 num_entries * slot_size);
274 uint32_t split = t->num_slots - slot_index;
276 rte_memcpy(entries, get_slot(t, start), split * slot_size);
277 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
279 (num_entries - split) * slot_size);
283 /* Input function optimised for single thread */
284 static __rte_always_inline uint32_t
285 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
286 uint32_t num_entries, bool block)
288 struct opdl_stage *s = input_stage(t);
289 uint32_t head = s->head;
291 num_entries = num_to_process(s, num_entries, block);
292 if (num_entries == 0)
295 copy_entries_in(t, head, entries, num_entries);
297 s->head += num_entries;
298 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
303 /* Convert head and tail of claim_manager into valid index */
304 static __rte_always_inline uint32_t
305 claim_mgr_index(uint32_t n)
307 return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
310 /* Check if there are available slots in claim_manager */
311 static __rte_always_inline bool
312 claim_mgr_available(struct claim_manager *mgr)
314 return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
318 /* Record a new claim. Only use after first checking an entry is available */
319 static __rte_always_inline void
320 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
322 if ((mgr->mgr_head != mgr->mgr_tail) &&
323 (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
325 /* Combine with previous claim */
326 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
328 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
329 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
333 mgr->num_claimed += (head - tail);
336 /* Read the oldest recorded claim */
337 static __rte_always_inline bool
338 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
340 if (mgr->mgr_head == mgr->mgr_tail)
343 *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
344 *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
348 /* Remove the oldest recorded claim. Only use after first reading the entry */
349 static __rte_always_inline void
350 claim_mgr_remove(struct claim_manager *mgr)
352 mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
353 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
357 /* Update tail in the oldest claim. Only use after first reading the entry */
358 static __rte_always_inline void
359 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
361 mgr->num_claimed -= num_entries;
362 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
365 static __rte_always_inline void
366 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
367 uint32_t num_entries, bool block)
369 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
373 while (num_entries) {
374 bool ret = claim_mgr_read(disclaims, &tail, &head);
377 break; /* nothing is claimed */
378 /* There should be no race condition here. If shared.tail
379 * matches, no other core can update it until this one does.
381 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
383 if (num_entries >= (head - tail)) {
384 claim_mgr_remove(disclaims);
385 __atomic_store_n(&s->shared.tail, head,
387 num_entries -= (head - tail);
389 claim_mgr_move_tail(disclaims, num_entries);
390 __atomic_store_n(&s->shared.tail,
395 } else if (block == false)
396 break; /* blocked by other thread */
397 /* Keep going until num_entries are disclaimed. */
401 disclaims->num_to_disclaim = num_entries;
404 /* Move head atomically, returning number of entries available to process and
405 * the original value of head. For non-input stages, the claim is recorded
406 * so that the tail can be updated later by opdl_stage_disclaim().
408 static __rte_always_inline void
409 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
410 uint32_t *old_head, bool block, bool claim_func)
412 uint32_t orig_num_entries = *num_entries;
414 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
416 /* Attempt to disclaim any outstanding claims */
417 opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
420 *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
423 /* If called by opdl_ring_input(), claim does not need to be
424 * recorded, as there will be no disclaim.
427 /* Check that the claim can be recorded */
428 ret = claim_mgr_available(disclaims);
430 /* exit out if claim can't be recorded */
436 *num_entries = num_to_process(s, orig_num_entries, block);
437 if (*num_entries == 0)
440 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
441 *old_head + *num_entries,
442 true, /* may fail spuriously */
443 __ATOMIC_RELEASE, /* memory order on success */
444 __ATOMIC_ACQUIRE); /* memory order on fail */
451 /* Store the claim record */
452 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
455 /* Input function that supports multiple threads */
456 static __rte_always_inline uint32_t
457 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
458 uint32_t num_entries, bool block)
460 struct opdl_stage *s = input_stage(t);
463 move_head_atomically(s, &num_entries, &old_head, block, false);
464 if (num_entries == 0)
467 copy_entries_in(t, old_head, entries, num_entries);
469 /* If another thread started inputting before this one, but hasn't
470 * finished, we need to wait for it to complete to update the tail.
472 while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
476 __atomic_store_n(&s->shared.tail, old_head + num_entries,
482 static __rte_always_inline uint32_t
483 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
486 return ((nb_p_lcores <= 1) ? 0 :
487 (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
491 /* Claim slots to process, optimised for single-thread operation */
492 static __rte_always_inline uint32_t
493 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
494 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
496 uint32_t i = 0, j = 0, offset;
498 struct rte_event *ev;
500 struct opdl_ring *t = s->t;
501 uint8_t *entries_offset = (uint8_t *)entries;
505 offset = opdl_first_entry_id(s->seq, s->nb_instance,
508 num_entries = s->nb_instance * num_entries;
510 num_entries = num_to_process(s, num_entries, block);
512 for (; offset < num_entries; offset += s->nb_instance) {
513 get_slots = get_slot(t, s->head + offset);
514 memcpy(entries_offset, get_slots, t->slot_size);
515 entries_offset += t->slot_size;
519 num_entries = num_to_process(s, num_entries, block);
521 for (j = 0; j < num_entries; j++) {
522 ev = (struct rte_event *)get_slot(t, s->head+j);
523 if ((ev->flow_id%s->nb_instance) == s->instance_id) {
524 memcpy(entries_offset, ev, t->slot_size);
525 entries_offset += t->slot_size;
530 s->shadow_head = s->head;
531 s->head += num_entries;
532 s->num_claimed = num_entries;
535 /* automatically disclaim entries if number of rte_events is zero */
536 if (unlikely(i == 0))
537 opdl_stage_disclaim(s, 0, false);
542 /* Thread-safe version of function to claim slots for processing */
543 static __rte_always_inline uint32_t
544 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
545 uint32_t num_entries, uint32_t *seq, bool block)
548 struct opdl_ring *t = s->t;
549 uint32_t i = 0, offset;
550 uint8_t *entries_offset = (uint8_t *)entries;
553 PMD_DRV_LOG(ERR, "Invalid seq PTR");
556 offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
557 num_entries = offset + (s->nb_instance * num_entries);
559 move_head_atomically(s, &num_entries, &old_head, block, true);
561 for (; offset < num_entries; offset += s->nb_instance) {
562 memcpy(entries_offset, get_slot(t, s->head + offset),
564 entries_offset += t->slot_size;
573 /* Claim and copy slot pointers, optimised for single-thread operation */
574 static __rte_always_inline uint32_t
575 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
576 uint32_t num_entries, uint32_t *seq, bool block)
578 num_entries = num_to_process(s, num_entries, block);
579 if (num_entries == 0)
581 copy_entries_out(s->t, s->head, entries, num_entries);
584 s->head += num_entries;
588 /* Thread-safe version of function to claim and copy pointers to slots */
589 static __rte_always_inline uint32_t
590 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
591 uint32_t num_entries, uint32_t *seq, bool block)
595 move_head_atomically(s, &num_entries, &old_head, block, true);
596 if (num_entries == 0)
598 copy_entries_out(s->t, old_head, entries, num_entries);
604 static __rte_always_inline void
605 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
606 uint32_t num_entries)
608 uint32_t old_tail = s->shared.tail;
610 if (unlikely(num_entries > (s->head - old_tail))) {
611 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
612 num_entries, s->head - old_tail);
613 num_entries = s->head - old_tail;
615 __atomic_store_n(&s->shared.tail, num_entries + old_tail,
620 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
623 if (input_stage(t)->threadsafe == false)
624 return opdl_ring_input_singlethread(t, entries, num_entries,
627 return opdl_ring_input_multithread(t, entries, num_entries,
632 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
633 const void *entries, uint32_t num_entries, bool block)
635 uint32_t head = s->head;
637 num_entries = num_to_process(s, num_entries, block);
639 if (num_entries == 0)
642 copy_entries_in(t, head, entries, num_entries);
644 s->head += num_entries;
645 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
652 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
653 void *entries, uint32_t num_entries, bool block)
655 uint32_t head = s->head;
657 num_entries = num_to_process(s, num_entries, block);
658 if (num_entries == 0)
661 copy_entries_out(t, head, entries, num_entries);
663 s->head += num_entries;
664 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
670 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
672 /* return (num_to_process(s, num_entries, false)); */
674 if (available(s) >= num_entries)
677 update_available_seq(s);
679 uint32_t avail = available(s);
685 return (avail <= num_entries) ? avail : num_entries;
689 opdl_stage_claim(struct opdl_stage *s, void *entries,
690 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
692 if (s->threadsafe == false)
693 return opdl_stage_claim_singlethread(s, entries, num_entries,
696 return opdl_stage_claim_multithread(s, entries, num_entries,
701 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
702 uint32_t num_entries, uint32_t *seq, bool block)
704 if (s->threadsafe == false)
705 return opdl_stage_claim_copy_singlethread(s, entries,
706 num_entries, seq, block);
708 return opdl_stage_claim_copy_multithread(s, entries,
709 num_entries, seq, block);
713 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
717 if (s->threadsafe == false) {
718 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
720 struct claim_manager *disclaims =
721 &s->pending_disclaims[rte_lcore_id()];
723 if (unlikely(num_entries > s->num_slots)) {
724 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
725 num_entries, disclaims->num_claimed);
726 num_entries = disclaims->num_claimed;
729 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
730 disclaims->num_claimed);
731 opdl_stage_disclaim_multithread_n(s, num_entries, block);
736 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
738 if (num_entries != s->num_event) {
742 if (s->threadsafe == false) {
743 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
744 s->seq += s->num_claimed;
745 s->shadow_head = s->head;
748 struct claim_manager *disclaims =
749 &s->pending_disclaims[rte_lcore_id()];
750 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
757 opdl_ring_available(struct opdl_ring *t)
759 return opdl_stage_available(&t->stages[0]);
763 opdl_stage_available(struct opdl_stage *s)
765 update_available_seq(s);
770 opdl_ring_flush(struct opdl_ring *t)
772 struct opdl_stage *s = input_stage(t);
774 wait_for_available(s, s->num_slots);
777 /******************** Non performance sensitive functions ********************/
779 /* Initial setup of a new stage's context */
781 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
784 uint32_t available = (is_input) ? t->num_slots : 0;
787 s->num_slots = t->num_slots;
788 s->index = t->num_stages;
789 s->threadsafe = threadsafe;
792 /* Alloc memory for deps */
793 s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
794 t->max_num_stages * sizeof(enum dep_type),
796 if (s->dep_tracking == NULL)
799 s->deps = rte_zmalloc_socket(LIB_NAME,
800 t->max_num_stages * sizeof(struct shared_state *),
802 if (s->deps == NULL) {
803 rte_free(s->dep_tracking);
807 s->dep_tracking[s->index] = DEP_SELF;
809 if (threadsafe == true)
810 s->shared.available_seq = available;
812 s->available_seq = available;
817 /* Add direct or indirect dependencies between stages */
819 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
822 struct opdl_ring *t = dependent->t;
825 /* Add new direct dependency */
826 if ((type == DEP_DIRECT) &&
827 (dependent->dep_tracking[dependency->index] ==
829 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
830 t->name, dependent->index, dependency->index);
831 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
834 /* Add new indirect dependency or change direct to indirect */
835 if ((type == DEP_INDIRECT) &&
836 ((dependent->dep_tracking[dependency->index] ==
838 (dependent->dep_tracking[dependency->index] ==
840 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
841 t->name, dependent->index, dependency->index);
842 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
845 /* Shouldn't happen... */
846 if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
847 (dependent != input_stage(t))) {
848 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
849 t->name, dependent->index);
853 /* Keep going to dependencies of the dependency, until input stage */
854 if (dependency != input_stage(t))
855 for (i = 0; i < dependency->num_deps; i++) {
856 int ret = add_dep(dependent, dependency->deps[i]->stage,
863 /* Make list of sequence numbers for direct dependencies only */
864 if (type == DEP_DIRECT)
865 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
866 if (dependent->dep_tracking[i] == DEP_DIRECT) {
867 if ((i == 0) && (dependent->num_deps > 1))
868 rte_panic("%s:%u depends on > input",
871 dependent->deps[dependent->num_deps++] =
872 &t->stages[i].shared;
879 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
880 uint32_t max_num_stages, int socket)
883 char mz_name[RTE_MEMZONE_NAMESIZE];
885 struct opdl_stage *st = NULL;
886 const struct rte_memzone *mz = NULL;
887 size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
888 (num_slots * slot_size));
890 /* Compile time checking */
891 RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
893 RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
894 RTE_CACHE_LINE_MASK) != 0);
895 RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
896 RTE_CACHE_LINE_MASK) != 0);
897 RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
899 /* Parameter checking */
901 PMD_DRV_LOG(ERR, "name param is NULL");
904 if (!rte_is_power_of_2(num_slots)) {
905 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
910 /* Alloc memory for stages */
911 st = rte_zmalloc_socket(LIB_NAME,
912 max_num_stages * sizeof(struct opdl_stage),
913 RTE_CACHE_LINE_SIZE, socket);
917 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
919 /* Alloc memory for memzone */
920 mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
926 /* Initialise opdl_ring queue */
927 memset(t, 0, sizeof(*t));
928 snprintf(t->name, sizeof(t->name), "%s", name);
930 t->num_slots = num_slots;
931 t->mask = num_slots - 1;
932 t->slot_size = slot_size;
933 t->max_num_stages = max_num_stages;
936 PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
937 t->name, t, num_slots, socket, slot_size);
942 PMD_DRV_LOG(ERR, "Cannot reserve memory");
944 rte_memzone_free(mz);
950 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
952 return get_slot(t, index);
956 opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
957 uint32_t index, bool atomic)
959 uint32_t i = 0, j = 0, offset;
960 struct opdl_ring *t = s->t;
961 struct rte_event *ev_orig = NULL;
962 bool ev_updated = false;
963 uint64_t ev_temp = 0;
965 if (index > s->num_event) {
966 PMD_DRV_LOG(ERR, "index is overflow");
970 ev_temp = ev->event&OPDL_EVENT_MASK;
973 offset = opdl_first_entry_id(s->seq, s->nb_instance,
975 offset += index*s->nb_instance;
976 ev_orig = get_slot(t, s->shadow_head+offset);
977 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
978 ev_orig->event = ev->event;
981 if (ev_orig->u64 != ev->u64) {
982 ev_orig->u64 = ev->u64;
987 for (i = 0; i < s->num_claimed; i++) {
988 ev_orig = (struct rte_event *)
989 get_slot(t, s->shadow_head+i);
991 if ((ev_orig->flow_id%s->nb_instance) ==
995 if ((ev_orig->event&OPDL_EVENT_MASK) !=
997 ev_orig->event = ev->event;
1000 if (ev_orig->u64 != ev->u64) {
1001 ev_orig->u64 = ev->u64;
1017 opdl_ring_get_socket(const struct opdl_ring *t)
1023 opdl_ring_get_num_slots(const struct opdl_ring *t)
1025 return t->num_slots;
1029 opdl_ring_get_name(const struct opdl_ring *t)
1034 /* Check dependency list is valid for a given opdl_ring */
1036 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1041 for (i = 0; i < num_deps; ++i) {
1043 PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1046 if (t != deps[i]->t) {
1047 PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1048 i, deps[i]->t->name, t->name);
1052 if (num_deps > t->num_stages) {
1053 PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
1054 num_deps, t->num_stages);
1061 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1063 struct opdl_stage *s;
1065 /* Parameter checking */
1067 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1070 if (t->num_stages == t->max_num_stages) {
1071 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1072 t->name, t->max_num_stages);
1076 s = &t->stages[t->num_stages];
1078 if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1079 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1080 &s->shared, t->name);
1082 if (init_stage(t, s, threadsafe, is_input) < 0) {
1083 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1092 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1093 uint32_t nb_instance, uint32_t instance_id,
1094 struct opdl_stage *deps[],
1100 if ((num_deps > 0) && (!deps)) {
1101 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1104 ret = check_deps(t, deps, num_deps);
1108 for (i = 0; i < num_deps; i++) {
1109 ret = add_dep(s, deps[i], DEP_DIRECT);
1114 s->nb_instance = nb_instance;
1115 s->instance_id = instance_id;
1121 opdl_ring_get_input_stage(const struct opdl_ring *t)
1123 return input_stage(t);
1127 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1133 if ((num_deps == 0) || (!deps)) {
1134 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1138 ret = check_deps(s->t, deps, num_deps);
1143 for (i = 0; i < num_deps; i++)
1144 s->deps[i] = &deps[i]->shared;
1145 s->num_deps = num_deps;
1151 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1157 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1162 fprintf(f, "NULL OPDL!\n");
1165 fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1166 t->name, t->num_slots, t->mask, t->slot_size,
1167 t->num_stages, t->socket);
1168 for (i = 0; i < t->num_stages; i++) {
1170 const struct opdl_stage *s = &t->stages[i];
1172 fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1173 t->name, i, (s->threadsafe) ? "true" : "false",
1174 (s->threadsafe) ? s->shared.head : s->head,
1175 (s->threadsafe) ? s->shared.available_seq :
1177 s->shared.tail, (s->num_deps > 0) ?
1178 s->deps[0]->stage->index : 0);
1179 for (j = 1; j < s->num_deps; j++)
1180 fprintf(f, ",%u", s->deps[j]->stage->index);
1187 opdl_ring_free(struct opdl_ring *t)
1190 const struct rte_memzone *mz;
1191 char mz_name[RTE_MEMZONE_NAMESIZE];
1194 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1198 PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1200 for (i = 0; i < t->num_stages; ++i) {
1201 rte_free(t->stages[i].deps);
1202 rte_free(t->stages[i].dep_tracking);
1205 rte_free(t->stages);
1207 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1208 mz = rte_memzone_lookup(mz_name);
1209 if (rte_memzone_free(mz) != 0)
1210 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1213 /* search a opdl_ring from its name */
1215 opdl_ring_lookup(const char *name)
1217 const struct rte_memzone *mz;
1218 char mz_name[RTE_MEMZONE_NAMESIZE];
1220 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1222 mz = rte_memzone_lookup(mz_name);
1230 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1232 s->threadsafe = threadsafe;