X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=drivers%2Fevent%2Fopdl%2Fopdl_ring.c;h=06fb5b33760d1b5ebb435e19a73c8137f6dfa38e;hb=7b6b3886defc3549190d4fc963705b88d219820c;hp=eca7712bdf2cba4c46a80e9678eb2f1b5c285e0f;hpb=e07a3ed786c50e31cbb6e01f9f26cada0c2a4d70;p=dpdk.git diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c index eca7712bdf..06fb5b3376 100644 --- a/drivers/event/opdl/opdl_ring.c +++ b/drivers/event/opdl/opdl_ring.c @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -15,7 +16,6 @@ #include #include #include -#include #include "opdl_ring.h" #include "opdl_log.h" @@ -25,7 +25,10 @@ #define OPDL_NAME_SIZE 64 -#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL) +#define OPDL_EVENT_MASK (0x00000000000FFFFFULL) +#define OPDL_FLOWID_MASK (0xFFFFF) +#define OPDL_OPA_MASK (0xFF) +#define OPDL_OPA_OFFSET (0x38) int opdl_logtype_driver; @@ -86,7 +89,6 @@ struct opdl_stage { */ uint32_t available_seq; uint32_t head; /* Current head for single-thread operation */ - uint32_t shadow_head; /* Shadow head for single-thread operation */ uint32_t nb_instance; /* Number of instances */ uint32_t instance_id; /* ID of this stage instance */ uint16_t num_claimed; /* Number of slots claimed */ @@ -102,6 +104,9 @@ struct opdl_stage { /* For managing disclaims in multi-threaded processing stages */ struct claim_manager pending_disclaims[RTE_MAX_LCORE] __rte_cache_aligned; + uint32_t shadow_head; /* Shadow head for single-thread operation */ + uint32_t queue_id; /* ID of Queue which is assigned to this stage */ + uint32_t pos; /* Atomic scan position */ } __rte_cache_aligned; /* Context for opdl_ring */ @@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, uint32_t num_entries, uint32_t *seq, bool block, bool atomic) { uint32_t i = 0, j = 0, offset; + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; void *get_slots; struct rte_event *ev; RTE_SET_USED(seq); @@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, for (j = 0; j < num_entries; j++) { ev = (struct rte_event *)get_slot(t, s->head+j); - if ((ev->flow_id%s->nb_instance) == s->instance_id) { + + event = __atomic_load_n(&(ev->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK & event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id % s->nb_instance) == s->instance_id) { memcpy(entries_offset, ev, t->slot_size); entries_offset += t->slot_size; i++; @@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, s->head += num_entries; s->num_claimed = num_entries; s->num_event = i; + s->pos = 0; /* automatically disclaim entries if number of rte_events is zero */ if (unlikely(i == 0)) @@ -736,7 +755,7 @@ int opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block) { if (num_entries != s->num_event) { - rte_errno = -EINVAL; + rte_errno = EINVAL; return 0; } if (s->threadsafe == false) { @@ -925,7 +944,7 @@ opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size, /* Initialise opdl_ring queue */ memset(t, 0, sizeof(*t)); - snprintf(t->name, sizeof(t->name), "%s", name); + strlcpy(t->name, name, sizeof(t->name)); t->socket = socket; t->num_slots = num_slots; t->mask = num_slots - 1; @@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) } bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic) { - uint32_t i = 0, j = 0, offset; + uint32_t i = 0, offset; struct opdl_ring *t = s->t; struct rte_event *ev_orig = NULL; bool ev_updated = false; - uint64_t ev_temp = 0; + uint64_t ev_temp = 0; + uint64_t ev_update = 0; + + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; if (index > s->num_event) { PMD_DRV_LOG(ERR, "index is overflow"); return ev_updated; } - ev_temp = ev->event&OPDL_EVENT_MASK; + ev_temp = ev->event & OPDL_EVENT_MASK; if (!atomic) { offset = opdl_first_entry_id(s->seq, s->nb_instance, @@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, } } else { - for (i = 0; i < s->num_claimed; i++) { + for (i = s->pos; i < s->num_claimed; i++) { ev_orig = (struct rte_event *) get_slot(t, s->shadow_head+i); - if ((ev_orig->flow_id%s->nb_instance) == - s->instance_id) { - - if (j == index) { - if ((ev_orig->event&OPDL_EVENT_MASK) != - ev_temp) { - ev_orig->event = ev->event; - ev_updated = true; - } - if (ev_orig->u64 != ev->u64) { - ev_orig->u64 = ev->u64; - ev_updated = true; - } - - break; + event = __atomic_load_n(&(ev_orig->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK & event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id % s->nb_instance) == s->instance_id) { + ev_update = s->queue_id; + ev_update = (ev_update << OPDL_OPA_OFFSET) + | ev->event; + + s->pos = i + 1; + + if ((event & OPDL_EVENT_MASK) != + ev_temp) { + __atomic_store_n(&(ev_orig->event), + ev_update, + __ATOMIC_RELEASE); + ev_updated = true; } - j++; + if (ev_orig->u64 != ev->u64) { + ev_orig->u64 = ev->u64; + ev_updated = true; + } + + break; } } @@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[], return -EINVAL; } } - if (num_deps > t->num_stages) { - PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)", - num_deps, t->num_stages); - return -EINVAL; - } + return 0; } @@ -1153,6 +1185,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s) return s->t; } +void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id) +{ + s->queue_id = queue_id; +} + void opdl_ring_dump(const struct opdl_ring *t, FILE *f) {