From b770f952de678b4964d05ce49b0518919f152b83 Mon Sep 17 00:00:00 2001 From: Liang Ma Date: Tue, 27 Mar 2018 15:18:12 +0100 Subject: [PATCH] event/opdl: fix atomic queue race condition If application link one atomic queue to multiple ports, and each worker core update flow_id, there will have a chance to hit race condition issue and lead to double processing same event. This fix solve the problem and eliminate the race condition issue. Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library") Cc: stable@dpdk.org Signed-off-by: Liang Ma Signed-off-by: Peter Mccarthy Acked-by: Harry van Haaren --- drivers/event/opdl/opdl_evdev_init.c | 3 + drivers/event/opdl/opdl_ring.c | 97 +++++++++++++++++++--------- drivers/event/opdl/opdl_ring.h | 16 ++++- 3 files changed, 86 insertions(+), 30 deletions(-) diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c index 1454de533b..582ad698f0 100644 --- a/drivers/event/opdl/opdl_evdev_init.c +++ b/drivers/event/opdl/opdl_evdev_init.c @@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev) queue->ports[queue->nb_ports] = port; port->instance_id = queue->nb_ports; queue->nb_ports++; + opdl_stage_set_queue_id(stage_inst, + port->queue_id); + } else if (queue->q_pos == OPDL_Q_POS_END) { /* tx port */ diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c index eca7712bdf..8aca481c97 100644 --- a/drivers/event/opdl/opdl_ring.c +++ b/drivers/event/opdl/opdl_ring.c @@ -25,7 +25,10 @@ #define OPDL_NAME_SIZE 64 -#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL) +#define OPDL_EVENT_MASK (0x00000000000FFFFFULL) +#define OPDL_FLOWID_MASK (0xFFFFF) +#define OPDL_OPA_MASK (0xFF) +#define OPDL_OPA_OFFSET (0x38) int opdl_logtype_driver; @@ -86,7 +89,6 @@ struct opdl_stage { */ uint32_t available_seq; uint32_t head; /* Current head for single-thread operation */ - uint32_t shadow_head; /* Shadow head for single-thread operation */ uint32_t nb_instance; /* Number of instances */ uint32_t instance_id; /* ID of this stage instance */ uint16_t num_claimed; /* Number of slots claimed */ @@ -102,6 +104,9 @@ struct opdl_stage { /* For managing disclaims in multi-threaded processing stages */ struct claim_manager pending_disclaims[RTE_MAX_LCORE] __rte_cache_aligned; + uint32_t shadow_head; /* Shadow head for single-thread operation */ + uint32_t queue_id; /* ID of Queue which is assigned to this stage */ + uint32_t pos; /* Atomic scan position */ } __rte_cache_aligned; /* Context for opdl_ring */ @@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, uint32_t num_entries, uint32_t *seq, bool block, bool atomic) { uint32_t i = 0, j = 0, offset; + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; void *get_slots; struct rte_event *ev; RTE_SET_USED(seq); @@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, for (j = 0; j < num_entries; j++) { ev = (struct rte_event *)get_slot(t, s->head+j); - if ((ev->flow_id%s->nb_instance) == s->instance_id) { + + event = __atomic_load_n(&(ev->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK & event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id % s->nb_instance) == s->instance_id) { memcpy(entries_offset, ev, t->slot_size); entries_offset += t->slot_size; i++; @@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, s->head += num_entries; s->num_claimed = num_entries; s->num_event = i; + s->pos = 0; /* automatically disclaim entries if number of rte_events is zero */ if (unlikely(i == 0)) @@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) } bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic) { - uint32_t i = 0, j = 0, offset; + uint32_t i = 0, offset; struct opdl_ring *t = s->t; struct rte_event *ev_orig = NULL; bool ev_updated = false; - uint64_t ev_temp = 0; + uint64_t ev_temp = 0; + uint64_t ev_update = 0; + + uint32_t opa_id = 0; + uint32_t flow_id = 0; + uint64_t event = 0; if (index > s->num_event) { PMD_DRV_LOG(ERR, "index is overflow"); return ev_updated; } - ev_temp = ev->event&OPDL_EVENT_MASK; + ev_temp = ev->event & OPDL_EVENT_MASK; if (!atomic) { offset = opdl_first_entry_id(s->seq, s->nb_instance, @@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, } } else { - for (i = 0; i < s->num_claimed; i++) { + for (i = s->pos; i < s->num_claimed; i++) { ev_orig = (struct rte_event *) get_slot(t, s->shadow_head+i); - if ((ev_orig->flow_id%s->nb_instance) == - s->instance_id) { - - if (j == index) { - if ((ev_orig->event&OPDL_EVENT_MASK) != - ev_temp) { - ev_orig->event = ev->event; - ev_updated = true; - } - if (ev_orig->u64 != ev->u64) { - ev_orig->u64 = ev->u64; - ev_updated = true; - } - - break; + event = __atomic_load_n(&(ev_orig->event), + __ATOMIC_ACQUIRE); + + opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); + flow_id = OPDL_FLOWID_MASK & event; + + if (opa_id >= s->queue_id) + continue; + + if ((flow_id % s->nb_instance) == s->instance_id) { + ev_update = s->queue_id; + ev_update = (ev_update << OPDL_OPA_OFFSET) + | ev->event; + + s->pos = i + 1; + + if ((event & OPDL_EVENT_MASK) != + ev_temp) { + __atomic_store_n(&(ev_orig->event), + ev_update, + __ATOMIC_RELEASE); + ev_updated = true; } - j++; + if (ev_orig->u64 != ev->u64) { + ev_orig->u64 = ev->u64; + ev_updated = true; + } + + break; } } @@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[], return -EINVAL; } } - if (num_deps > t->num_stages) { - PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)", - num_deps, t->num_stages); - return -EINVAL; - } + return 0; } @@ -1153,6 +1185,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s) return s->t; } +void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id) +{ + s->queue_id = queue_id; +} + void opdl_ring_dump(const struct opdl_ring *t, FILE *f) { diff --git a/drivers/event/opdl/opdl_ring.h b/drivers/event/opdl/opdl_ring.h index 9e8c33e681..751a59dbce 100644 --- a/drivers/event/opdl/opdl_ring.h +++ b/drivers/event/opdl/opdl_ring.h @@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries); struct opdl_stage * opdl_stage_create(struct opdl_ring *t, bool threadsafe); + +/** + * Set the internal queue id for each stage instance. + * + * @param s + * The pointer of stage instance. + * + * @param queue_id + * The value of internal queue id. + */ +void +opdl_stage_set_queue_id(struct opdl_stage *s, + uint32_t queue_id); + /** * Prints information on opdl_ring instance and all its stages * @@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe); */ bool -opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev, +opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, uint32_t index, bool atomic); #ifdef __cplusplus -- 2.20.1