2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright(c) 2010-2014 Intel Corporation
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_eal_memconfig.h>
21 #include "opdl_ring.h"
24 #define LIB_NAME "opdl_ring"
26 #define OPDL_NAME_SIZE 64
29 #define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL)
31 int opdl_logtype_driver;
33 /* Types of dependency between stages */
35 DEP_NONE = 0, /* no dependency */
36 DEP_DIRECT, /* stage has direct dependency */
37 DEP_INDIRECT, /* in-direct dependency through other stage(s) */
38 DEP_SELF, /* stage dependency on itself, used to detect loops */
41 /* Shared section of stage state.
42 * Care is needed when accessing and the layout is important, especially to
43 * limit the adjacent cache-line HW prefetcher from impacting performance.
46 /* Last known minimum sequence number of dependencies, used for multi
49 uint32_t available_seq;
50 char _pad1[RTE_CACHE_LINE_SIZE * 3];
51 uint32_t head; /* Head sequence number (for multi thread operation) */
52 char _pad2[RTE_CACHE_LINE_SIZE * 3];
53 struct opdl_stage *stage; /* back pointer */
54 uint32_t tail; /* Tail sequence number */
55 char _pad3[RTE_CACHE_LINE_SIZE * 2];
56 } __rte_cache_aligned;
58 /* A structure to keep track of "unfinished" claims. This is only used for
59 * stages that are threadsafe. Each lcore accesses its own instance of this
60 * structure to record the entries it has claimed. This allows one lcore to make
61 * multiple claims without being blocked by another. When disclaiming it moves
62 * forward the shared tail when the shared tail matches the tail value recorded
65 struct claim_manager {
66 uint32_t num_to_disclaim;
73 } claims[OPDL_DISCLAIMS_PER_LCORE];
74 } __rte_cache_aligned;
76 /* Context for each stage of opdl_ring.
77 * Calculations on sequence numbers need to be done with other uint32_t values
78 * so that results are modulus 2^32, and not undefined.
81 struct opdl_ring *t; /* back pointer, set at init */
82 uint32_t num_slots; /* Number of slots for entries, set at init */
83 uint32_t index; /* ID for this stage, set at init */
84 bool threadsafe; /* Set to 1 if this stage supports threadsafe use */
85 /* Last known min seq number of dependencies for used for single thread
88 uint32_t available_seq;
89 uint32_t head; /* Current head for single-thread operation */
90 uint32_t shadow_head; /* Shadow head for single-thread operation */
91 uint32_t nb_instance; /* Number of instances */
92 uint32_t instance_id; /* ID of this stage instance */
93 uint16_t num_claimed; /* Number of slots claimed */
94 uint16_t num_event; /* Number of events */
95 uint32_t seq; /* sequence number */
96 uint32_t num_deps; /* Number of direct dependencies */
97 /* Keep track of all dependencies, used during init only */
98 enum dep_type *dep_tracking;
99 /* Direct dependencies of this stage */
100 struct shared_state **deps;
101 /* Other stages read this! */
102 struct shared_state shared __rte_cache_aligned;
103 /* For managing disclaims in multi-threaded processing stages */
104 struct claim_manager pending_disclaims[RTE_MAX_LCORE]
106 } __rte_cache_aligned;
108 /* Context for opdl_ring */
110 char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */
111 int socket; /* NUMA socket that memory is allocated on */
112 uint32_t num_slots; /* Number of slots for entries */
113 uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */
114 uint32_t slot_size; /* Size of each slot in bytes */
115 uint32_t num_stages; /* Number of stages that have been added */
116 uint32_t max_num_stages; /* Max number of stages */
117 /* Stages indexed by ID */
118 struct opdl_stage *stages;
119 /* Memory for storing slot data */
120 uint8_t slots[0] __rte_cache_aligned;
124 /* Return input stage of a opdl_ring */
125 static __rte_always_inline struct opdl_stage *
126 input_stage(const struct opdl_ring *t)
128 return &t->stages[0];
131 /* Check if a stage is the input stage */
132 static __rte_always_inline bool
133 is_input_stage(const struct opdl_stage *s)
135 return s->index == 0;
138 /* Get slot pointer from sequence number */
139 static __rte_always_inline void *
140 get_slot(const struct opdl_ring *t, uint32_t n)
142 return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
145 /* Find how many entries are available for processing */
146 static __rte_always_inline uint32_t
147 available(const struct opdl_stage *s)
149 if (s->threadsafe == true) {
150 uint32_t n = __atomic_load_n(&s->shared.available_seq,
152 __atomic_load_n(&s->shared.head,
155 /* Return 0 if available_seq needs to be updated */
156 return (n <= s->num_slots) ? n : 0;
159 /* Single threaded */
160 return s->available_seq - s->head;
163 /* Read sequence number of dependencies and find minimum */
164 static __rte_always_inline void
165 update_available_seq(struct opdl_stage *s)
168 uint32_t this_tail = s->shared.tail;
169 uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
170 /* Input stage sequence numbers are greater than the sequence numbers of
171 * its dependencies so an offset of t->num_slots is needed when
172 * calculating available slots and also the condition which is used to
173 * determine the dependencies minimum sequence number must be reverted.
177 if (is_input_stage(s)) {
179 for (i = 1; i < s->num_deps; i++) {
180 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
182 if ((this_tail - seq) > (this_tail - min_seq))
187 for (i = 1; i < s->num_deps; i++) {
188 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
190 if ((seq - this_tail) < (min_seq - this_tail))
195 if (s->threadsafe == false)
196 s->available_seq = min_seq + wrap;
198 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
202 /* Wait until the number of available slots reaches number requested */
203 static __rte_always_inline void
204 wait_for_available(struct opdl_stage *s, uint32_t n)
206 while (available(s) < n) {
208 update_available_seq(s);
212 /* Return number of slots to process based on number requested and mode */
213 static __rte_always_inline uint32_t
214 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
216 /* Don't read tail sequences of dependencies if not needed */
217 if (available(s) >= n)
220 update_available_seq(s);
222 if (block == false) {
223 uint32_t avail = available(s);
229 return (avail <= n) ? avail : n;
232 if (unlikely(n > s->num_slots)) {
233 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
235 return 0; /* Avoid infinite loop */
238 wait_for_available(s, n);
242 /* Copy entries in to slots with wrap-around */
243 static __rte_always_inline void
244 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
245 uint32_t num_entries)
247 uint32_t slot_size = t->slot_size;
248 uint32_t slot_index = start & t->mask;
250 if (slot_index + num_entries <= t->num_slots) {
251 rte_memcpy(get_slot(t, start), entries,
252 num_entries * slot_size);
254 uint32_t split = t->num_slots - slot_index;
256 rte_memcpy(get_slot(t, start), entries, split * slot_size);
257 rte_memcpy(get_slot(t, 0),
258 RTE_PTR_ADD(entries, split * slot_size),
259 (num_entries - split) * slot_size);
263 /* Copy entries out from slots with wrap-around */
264 static __rte_always_inline void
265 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
266 uint32_t num_entries)
268 uint32_t slot_size = t->slot_size;
269 uint32_t slot_index = start & t->mask;
271 if (slot_index + num_entries <= t->num_slots) {
272 rte_memcpy(entries, get_slot(t, start),
273 num_entries * slot_size);
275 uint32_t split = t->num_slots - slot_index;
277 rte_memcpy(entries, get_slot(t, start), split * slot_size);
278 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
280 (num_entries - split) * slot_size);
284 /* Input function optimised for single thread */
285 static __rte_always_inline uint32_t
286 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
287 uint32_t num_entries, bool block)
289 struct opdl_stage *s = input_stage(t);
290 uint32_t head = s->head;
292 num_entries = num_to_process(s, num_entries, block);
293 if (num_entries == 0)
296 copy_entries_in(t, head, entries, num_entries);
298 s->head += num_entries;
299 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
304 /* Convert head and tail of claim_manager into valid index */
305 static __rte_always_inline uint32_t
306 claim_mgr_index(uint32_t n)
308 return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
311 /* Check if there are available slots in claim_manager */
312 static __rte_always_inline bool
313 claim_mgr_available(struct claim_manager *mgr)
315 return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
319 /* Record a new claim. Only use after first checking an entry is available */
320 static __rte_always_inline void
321 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
323 if ((mgr->mgr_head != mgr->mgr_tail) &&
324 (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
326 /* Combine with previous claim */
327 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
329 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
330 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
334 mgr->num_claimed += (head - tail);
337 /* Read the oldest recorded claim */
338 static __rte_always_inline bool
339 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
341 if (mgr->mgr_head == mgr->mgr_tail)
344 *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
345 *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
349 /* Remove the oldest recorded claim. Only use after first reading the entry */
350 static __rte_always_inline void
351 claim_mgr_remove(struct claim_manager *mgr)
353 mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
354 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
358 /* Update tail in the oldest claim. Only use after first reading the entry */
359 static __rte_always_inline void
360 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
362 mgr->num_claimed -= num_entries;
363 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
366 static __rte_always_inline void
367 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
368 uint32_t num_entries, bool block)
370 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
374 while (num_entries) {
375 bool ret = claim_mgr_read(disclaims, &tail, &head);
378 break; /* nothing is claimed */
379 /* There should be no race condition here. If shared.tail
380 * matches, no other core can update it until this one does.
382 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
384 if (num_entries >= (head - tail)) {
385 claim_mgr_remove(disclaims);
386 __atomic_store_n(&s->shared.tail, head,
388 num_entries -= (head - tail);
390 claim_mgr_move_tail(disclaims, num_entries);
391 __atomic_store_n(&s->shared.tail,
396 } else if (block == false)
397 break; /* blocked by other thread */
398 /* Keep going until num_entries are disclaimed. */
402 disclaims->num_to_disclaim = num_entries;
405 /* Move head atomically, returning number of entries available to process and
406 * the original value of head. For non-input stages, the claim is recorded
407 * so that the tail can be updated later by opdl_stage_disclaim().
409 static __rte_always_inline void
410 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
411 uint32_t *old_head, bool block, bool claim_func)
413 uint32_t orig_num_entries = *num_entries;
415 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
417 /* Attempt to disclaim any outstanding claims */
418 opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
421 *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
424 /* If called by opdl_ring_input(), claim does not need to be
425 * recorded, as there will be no disclaim.
428 /* Check that the claim can be recorded */
429 ret = claim_mgr_available(disclaims);
431 /* exit out if claim can't be recorded */
437 *num_entries = num_to_process(s, orig_num_entries, block);
438 if (*num_entries == 0)
441 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
442 *old_head + *num_entries,
443 true, /* may fail spuriously */
444 __ATOMIC_RELEASE, /* memory order on success */
445 __ATOMIC_ACQUIRE); /* memory order on fail */
452 /* Store the claim record */
453 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
456 /* Input function that supports multiple threads */
457 static __rte_always_inline uint32_t
458 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
459 uint32_t num_entries, bool block)
461 struct opdl_stage *s = input_stage(t);
464 move_head_atomically(s, &num_entries, &old_head, block, false);
465 if (num_entries == 0)
468 copy_entries_in(t, old_head, entries, num_entries);
470 /* If another thread started inputting before this one, but hasn't
471 * finished, we need to wait for it to complete to update the tail.
473 while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
477 __atomic_store_n(&s->shared.tail, old_head + num_entries,
483 static __rte_always_inline uint32_t
484 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
487 return ((nb_p_lcores <= 1) ? 0 :
488 (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
492 /* Claim slots to process, optimised for single-thread operation */
493 static __rte_always_inline uint32_t
494 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
495 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
497 uint32_t i = 0, j = 0, offset;
499 struct rte_event *ev;
501 struct opdl_ring *t = s->t;
502 uint8_t *entries_offset = (uint8_t *)entries;
506 offset = opdl_first_entry_id(s->seq, s->nb_instance,
509 num_entries = s->nb_instance * num_entries;
511 num_entries = num_to_process(s, num_entries, block);
513 for (; offset < num_entries; offset += s->nb_instance) {
514 get_slots = get_slot(t, s->head + offset);
515 memcpy(entries_offset, get_slots, t->slot_size);
516 entries_offset += t->slot_size;
520 num_entries = num_to_process(s, num_entries, block);
522 for (j = 0; j < num_entries; j++) {
523 ev = (struct rte_event *)get_slot(t, s->head+j);
524 if ((ev->flow_id%s->nb_instance) == s->instance_id) {
525 memcpy(entries_offset, ev, t->slot_size);
526 entries_offset += t->slot_size;
531 s->shadow_head = s->head;
532 s->head += num_entries;
533 s->num_claimed = num_entries;
536 /* automatically disclaim entries if number of rte_events is zero */
537 if (unlikely(i == 0))
538 opdl_stage_disclaim(s, 0, false);
543 /* Thread-safe version of function to claim slots for processing */
544 static __rte_always_inline uint32_t
545 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
546 uint32_t num_entries, uint32_t *seq, bool block)
549 struct opdl_ring *t = s->t;
550 uint32_t i = 0, offset;
551 uint8_t *entries_offset = (uint8_t *)entries;
554 PMD_DRV_LOG(ERR, "Invalid seq PTR");
557 offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
558 num_entries = offset + (s->nb_instance * num_entries);
560 move_head_atomically(s, &num_entries, &old_head, block, true);
562 for (; offset < num_entries; offset += s->nb_instance) {
563 memcpy(entries_offset, get_slot(t, s->head + offset),
565 entries_offset += t->slot_size;
574 /* Claim and copy slot pointers, optimised for single-thread operation */
575 static __rte_always_inline uint32_t
576 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
577 uint32_t num_entries, uint32_t *seq, bool block)
579 num_entries = num_to_process(s, num_entries, block);
580 if (num_entries == 0)
582 copy_entries_out(s->t, s->head, entries, num_entries);
585 s->head += num_entries;
589 /* Thread-safe version of function to claim and copy pointers to slots */
590 static __rte_always_inline uint32_t
591 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
592 uint32_t num_entries, uint32_t *seq, bool block)
596 move_head_atomically(s, &num_entries, &old_head, block, true);
597 if (num_entries == 0)
599 copy_entries_out(s->t, old_head, entries, num_entries);
605 static __rte_always_inline void
606 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
607 uint32_t num_entries)
609 uint32_t old_tail = s->shared.tail;
611 if (unlikely(num_entries > (s->head - old_tail))) {
612 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
613 num_entries, s->head - old_tail);
614 num_entries = s->head - old_tail;
616 __atomic_store_n(&s->shared.tail, num_entries + old_tail,
621 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
624 if (input_stage(t)->threadsafe == false)
625 return opdl_ring_input_singlethread(t, entries, num_entries,
628 return opdl_ring_input_multithread(t, entries, num_entries,
633 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
634 const void *entries, uint32_t num_entries, bool block)
636 uint32_t head = s->head;
638 num_entries = num_to_process(s, num_entries, block);
640 if (num_entries == 0)
643 copy_entries_in(t, head, entries, num_entries);
645 s->head += num_entries;
646 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
653 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
654 void *entries, uint32_t num_entries, bool block)
656 uint32_t head = s->head;
658 num_entries = num_to_process(s, num_entries, block);
659 if (num_entries == 0)
662 copy_entries_out(t, head, entries, num_entries);
664 s->head += num_entries;
665 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
671 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
673 /* return (num_to_process(s, num_entries, false)); */
675 if (available(s) >= num_entries)
678 update_available_seq(s);
680 uint32_t avail = available(s);
686 return (avail <= num_entries) ? avail : num_entries;
690 opdl_stage_claim(struct opdl_stage *s, void *entries,
691 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
693 if (s->threadsafe == false)
694 return opdl_stage_claim_singlethread(s, entries, num_entries,
697 return opdl_stage_claim_multithread(s, entries, num_entries,
702 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
703 uint32_t num_entries, uint32_t *seq, bool block)
705 if (s->threadsafe == false)
706 return opdl_stage_claim_copy_singlethread(s, entries,
707 num_entries, seq, block);
709 return opdl_stage_claim_copy_multithread(s, entries,
710 num_entries, seq, block);
714 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
718 if (s->threadsafe == false) {
719 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
721 struct claim_manager *disclaims =
722 &s->pending_disclaims[rte_lcore_id()];
724 if (unlikely(num_entries > s->num_slots)) {
725 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
726 num_entries, disclaims->num_claimed);
727 num_entries = disclaims->num_claimed;
730 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
731 disclaims->num_claimed);
732 opdl_stage_disclaim_multithread_n(s, num_entries, block);
737 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
739 if (num_entries != s->num_event) {
743 if (s->threadsafe == false) {
744 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
745 s->seq += s->num_claimed;
746 s->shadow_head = s->head;
749 struct claim_manager *disclaims =
750 &s->pending_disclaims[rte_lcore_id()];
751 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
758 opdl_ring_available(struct opdl_ring *t)
760 return opdl_stage_available(&t->stages[0]);
764 opdl_stage_available(struct opdl_stage *s)
766 update_available_seq(s);
771 opdl_ring_flush(struct opdl_ring *t)
773 struct opdl_stage *s = input_stage(t);
775 wait_for_available(s, s->num_slots);
778 /******************** Non performance sensitive functions ********************/
780 /* Initial setup of a new stage's context */
782 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
785 uint32_t available = (is_input) ? t->num_slots : 0;
788 s->num_slots = t->num_slots;
789 s->index = t->num_stages;
790 s->threadsafe = threadsafe;
793 /* Alloc memory for deps */
794 s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
795 t->max_num_stages * sizeof(enum dep_type),
797 if (s->dep_tracking == NULL)
800 s->deps = rte_zmalloc_socket(LIB_NAME,
801 t->max_num_stages * sizeof(struct shared_state *),
803 if (s->deps == NULL) {
804 rte_free(s->dep_tracking);
808 s->dep_tracking[s->index] = DEP_SELF;
810 if (threadsafe == true)
811 s->shared.available_seq = available;
813 s->available_seq = available;
818 /* Add direct or indirect dependencies between stages */
820 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
823 struct opdl_ring *t = dependent->t;
826 /* Add new direct dependency */
827 if ((type == DEP_DIRECT) &&
828 (dependent->dep_tracking[dependency->index] ==
830 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
831 t->name, dependent->index, dependency->index);
832 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
835 /* Add new indirect dependency or change direct to indirect */
836 if ((type == DEP_INDIRECT) &&
837 ((dependent->dep_tracking[dependency->index] ==
839 (dependent->dep_tracking[dependency->index] ==
841 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
842 t->name, dependent->index, dependency->index);
843 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
846 /* Shouldn't happen... */
847 if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
848 (dependent != input_stage(t))) {
849 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
850 t->name, dependent->index);
854 /* Keep going to dependencies of the dependency, until input stage */
855 if (dependency != input_stage(t))
856 for (i = 0; i < dependency->num_deps; i++) {
857 int ret = add_dep(dependent, dependency->deps[i]->stage,
864 /* Make list of sequence numbers for direct dependencies only */
865 if (type == DEP_DIRECT)
866 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
867 if (dependent->dep_tracking[i] == DEP_DIRECT) {
868 if ((i == 0) && (dependent->num_deps > 1))
869 rte_panic("%s:%u depends on > input",
872 dependent->deps[dependent->num_deps++] =
873 &t->stages[i].shared;
880 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
881 uint32_t max_num_stages, int socket)
884 char mz_name[RTE_MEMZONE_NAMESIZE];
886 struct opdl_stage *st = NULL;
887 const struct rte_memzone *mz = NULL;
888 size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
889 (num_slots * slot_size));
891 /* Compile time checking */
892 RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
894 RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
895 RTE_CACHE_LINE_MASK) != 0);
896 RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
897 RTE_CACHE_LINE_MASK) != 0);
898 RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
900 /* Parameter checking */
902 PMD_DRV_LOG(ERR, "name param is NULL");
905 if (!rte_is_power_of_2(num_slots)) {
906 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
911 /* Alloc memory for stages */
912 st = rte_zmalloc_socket(LIB_NAME,
913 max_num_stages * sizeof(struct opdl_stage),
914 RTE_CACHE_LINE_SIZE, socket);
918 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
920 /* Alloc memory for memzone */
921 mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
927 /* Initialise opdl_ring queue */
928 memset(t, 0, sizeof(*t));
929 snprintf(t->name, sizeof(t->name), "%s", name);
931 t->num_slots = num_slots;
932 t->mask = num_slots - 1;
933 t->slot_size = slot_size;
934 t->max_num_stages = max_num_stages;
937 PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
938 t->name, t, num_slots, socket, slot_size);
943 PMD_DRV_LOG(ERR, "Cannot reserve memory");
945 rte_memzone_free(mz);
951 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
953 return get_slot(t, index);
957 opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
958 uint32_t index, bool atomic)
960 uint32_t i = 0, j = 0, offset;
961 struct opdl_ring *t = s->t;
962 struct rte_event *ev_orig = NULL;
963 bool ev_updated = false;
964 uint64_t ev_temp = 0;
966 if (index > s->num_event) {
967 PMD_DRV_LOG(ERR, "index is overflow");
971 ev_temp = ev->event&OPDL_EVENT_MASK;
974 offset = opdl_first_entry_id(s->seq, s->nb_instance,
976 offset += index*s->nb_instance;
977 ev_orig = get_slot(t, s->shadow_head+offset);
978 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
979 ev_orig->event = ev->event;
982 if (ev_orig->u64 != ev->u64) {
983 ev_orig->u64 = ev->u64;
988 for (i = 0; i < s->num_claimed; i++) {
989 ev_orig = (struct rte_event *)
990 get_slot(t, s->shadow_head+i);
992 if ((ev_orig->flow_id%s->nb_instance) ==
996 if ((ev_orig->event&OPDL_EVENT_MASK) !=
998 ev_orig->event = ev->event;
1001 if (ev_orig->u64 != ev->u64) {
1002 ev_orig->u64 = ev->u64;
1018 opdl_ring_get_socket(const struct opdl_ring *t)
1024 opdl_ring_get_num_slots(const struct opdl_ring *t)
1026 return t->num_slots;
1030 opdl_ring_get_name(const struct opdl_ring *t)
1035 /* Check dependency list is valid for a given opdl_ring */
1037 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1042 for (i = 0; i < num_deps; ++i) {
1044 PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1047 if (t != deps[i]->t) {
1048 PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1049 i, deps[i]->t->name, t->name);
1053 if (num_deps > t->num_stages) {
1054 PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
1055 num_deps, t->num_stages);
1062 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1064 struct opdl_stage *s;
1066 /* Parameter checking */
1068 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1071 if (t->num_stages == t->max_num_stages) {
1072 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1073 t->name, t->max_num_stages);
1077 s = &t->stages[t->num_stages];
1079 if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1080 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1081 &s->shared, t->name);
1083 if (init_stage(t, s, threadsafe, is_input) < 0) {
1084 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1093 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1094 uint32_t nb_instance, uint32_t instance_id,
1095 struct opdl_stage *deps[],
1101 if ((num_deps > 0) && (!deps)) {
1102 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1105 ret = check_deps(t, deps, num_deps);
1109 for (i = 0; i < num_deps; i++) {
1110 ret = add_dep(s, deps[i], DEP_DIRECT);
1115 s->nb_instance = nb_instance;
1116 s->instance_id = instance_id;
1122 opdl_ring_get_input_stage(const struct opdl_ring *t)
1124 return input_stage(t);
1128 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1134 if ((num_deps == 0) || (!deps)) {
1135 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1139 ret = check_deps(s->t, deps, num_deps);
1144 for (i = 0; i < num_deps; i++)
1145 s->deps[i] = &deps[i]->shared;
1146 s->num_deps = num_deps;
1152 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1158 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1163 fprintf(f, "NULL OPDL!\n");
1166 fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1167 t->name, t->num_slots, t->mask, t->slot_size,
1168 t->num_stages, t->socket);
1169 for (i = 0; i < t->num_stages; i++) {
1171 const struct opdl_stage *s = &t->stages[i];
1173 fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1174 t->name, i, (s->threadsafe) ? "true" : "false",
1175 (s->threadsafe) ? s->shared.head : s->head,
1176 (s->threadsafe) ? s->shared.available_seq :
1178 s->shared.tail, (s->num_deps > 0) ?
1179 s->deps[0]->stage->index : 0);
1180 for (j = 1; j < s->num_deps; j++)
1181 fprintf(f, ",%u", s->deps[j]->stage->index);
1188 opdl_ring_free(struct opdl_ring *t)
1191 const struct rte_memzone *mz;
1192 char mz_name[RTE_MEMZONE_NAMESIZE];
1195 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1199 PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1201 for (i = 0; i < t->num_stages; ++i) {
1202 rte_free(t->stages[i].deps);
1203 rte_free(t->stages[i].dep_tracking);
1206 rte_free(t->stages);
1208 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1209 mz = rte_memzone_lookup(mz_name);
1210 if (rte_memzone_free(mz) != 0)
1211 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1214 /* search a opdl_ring from its name */
1216 opdl_ring_lookup(const char *name)
1218 const struct rte_memzone *mz;
1219 char mz_name[RTE_MEMZONE_NAMESIZE];
1221 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1223 mz = rte_memzone_lookup(mz_name);
1231 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1233 s->threadsafe = threadsafe;