event/opdl: fix atomic queue race condition
authorLiang Ma <liang.j.ma@intel.com>
Tue, 27 Mar 2018 14:18:12 +0000 (15:18 +0100)
committerThomas Monjalon <thomas@monjalon.net>
Mon, 16 Apr 2018 08:10:03 +0000 (10:10 +0200)
If application link one atomic queue to multiple ports,
and each worker core update flow_id, there will have a
chance to hit race condition issue and lead to double processing
same event. This fix solve the problem and eliminate
the race condition issue.

Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
Cc: stable@dpdk.org
Signed-off-by: Liang Ma <liang.j.ma@intel.com>
Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
drivers/event/opdl/opdl_evdev_init.c
drivers/event/opdl/opdl_ring.c
drivers/event/opdl/opdl_ring.h

index 1454de5..582ad69 100644 (file)
@@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
                                queue->ports[queue->nb_ports] = port;
                                port->instance_id = queue->nb_ports;
                                queue->nb_ports++;
+                               opdl_stage_set_queue_id(stage_inst,
+                                               port->queue_id);
+
                        } else if (queue->q_pos == OPDL_Q_POS_END) {
 
                                /* tx port  */
index eca7712..8aca481 100644 (file)
 #define OPDL_NAME_SIZE 64
 
 
-#define OPDL_EVENT_MASK  (0xFFFF0000000FFFFFULL)
+#define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
+#define OPDL_FLOWID_MASK (0xFFFFF)
+#define OPDL_OPA_MASK    (0xFF)
+#define OPDL_OPA_OFFSET  (0x38)
 
 int opdl_logtype_driver;
 
@@ -86,7 +89,6 @@ struct opdl_stage {
         */
        uint32_t available_seq;
        uint32_t head;  /* Current head for single-thread operation */
-       uint32_t shadow_head;  /* Shadow head for single-thread operation */
        uint32_t nb_instance;  /* Number of instances */
        uint32_t instance_id;  /* ID of this stage instance */
        uint16_t num_claimed;  /* Number of slots claimed */
@@ -102,6 +104,9 @@ struct opdl_stage {
        /* For managing disclaims in multi-threaded processing stages */
        struct claim_manager pending_disclaims[RTE_MAX_LCORE]
                                               __rte_cache_aligned;
+       uint32_t shadow_head;  /* Shadow head for single-thread operation */
+       uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
+       uint32_t pos;           /* Atomic scan position */
 } __rte_cache_aligned;
 
 /* Context for opdl_ring */
@@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
                uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
 {
        uint32_t i = 0, j = 0,  offset;
+       uint32_t opa_id   = 0;
+       uint32_t flow_id  = 0;
+       uint64_t event    = 0;
        void *get_slots;
        struct rte_event *ev;
        RTE_SET_USED(seq);
@@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
 
                for (j = 0; j < num_entries; j++) {
                        ev = (struct rte_event *)get_slot(t, s->head+j);
-                       if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+
+                       event  = __atomic_load_n(&(ev->event),
+                                       __ATOMIC_ACQUIRE);
+
+                       opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
+                       flow_id  = OPDL_FLOWID_MASK & event;
+
+                       if (opa_id >= s->queue_id)
+                               continue;
+
+                       if ((flow_id % s->nb_instance) == s->instance_id) {
                                memcpy(entries_offset, ev, t->slot_size);
                                entries_offset += t->slot_size;
                                i++;
@@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
        s->head += num_entries;
        s->num_claimed = num_entries;
        s->num_event = i;
+       s->pos = 0;
 
        /* automatically disclaim entries if number of rte_events is zero */
        if (unlikely(i == 0))
@@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
 }
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
                uint32_t index, bool atomic)
 {
-       uint32_t i = 0, j = 0, offset;
+       uint32_t i = 0, offset;
        struct opdl_ring *t = s->t;
        struct rte_event *ev_orig = NULL;
        bool ev_updated = false;
-       uint64_t  ev_temp = 0;
+       uint64_t ev_temp    = 0;
+       uint64_t ev_update  = 0;
+
+       uint32_t opa_id   = 0;
+       uint32_t flow_id  = 0;
+       uint64_t event    = 0;
 
        if (index > s->num_event) {
                PMD_DRV_LOG(ERR, "index is overflow");
                return ev_updated;
        }
 
-       ev_temp = ev->event&OPDL_EVENT_MASK;
+       ev_temp = ev->event & OPDL_EVENT_MASK;
 
        if (!atomic) {
                offset = opdl_first_entry_id(s->seq, s->nb_instance,
@@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
                }
 
        } else {
-               for (i = 0; i < s->num_claimed; i++) {
+               for (i = s->pos; i < s->num_claimed; i++) {
                        ev_orig = (struct rte_event *)
                                get_slot(t, s->shadow_head+i);
 
-                       if ((ev_orig->flow_id%s->nb_instance) ==
-                                       s->instance_id) {
-
-                               if (j == index) {
-                                       if ((ev_orig->event&OPDL_EVENT_MASK) !=
-                                                       ev_temp) {
-                                               ev_orig->event = ev->event;
-                                               ev_updated = true;
-                                       }
-                                       if (ev_orig->u64 != ev->u64) {
-                                               ev_orig->u64 = ev->u64;
-                                               ev_updated = true;
-                                       }
-
-                                       break;
+                       event  = __atomic_load_n(&(ev_orig->event),
+                                       __ATOMIC_ACQUIRE);
+
+                       opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
+                       flow_id  = OPDL_FLOWID_MASK & event;
+
+                       if (opa_id >= s->queue_id)
+                               continue;
+
+                       if ((flow_id % s->nb_instance) == s->instance_id) {
+                               ev_update = s->queue_id;
+                               ev_update = (ev_update << OPDL_OPA_OFFSET)
+                                       | ev->event;
+
+                               s->pos = i + 1;
+
+                               if ((event & OPDL_EVENT_MASK) !=
+                                               ev_temp) {
+                                       __atomic_store_n(&(ev_orig->event),
+                                                       ev_update,
+                                                       __ATOMIC_RELEASE);
+                                       ev_updated = true;
                                }
-                               j++;
+                               if (ev_orig->u64 != ev->u64) {
+                                       ev_orig->u64 = ev->u64;
+                                       ev_updated = true;
+                               }
+
+                               break;
                        }
                }
 
@@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
                        return -EINVAL;
                }
        }
-       if (num_deps > t->num_stages) {
-               PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
-                               num_deps, t->num_stages);
-               return -EINVAL;
-       }
+
        return 0;
 }
 
@@ -1153,6 +1185,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s)
        return s->t;
 }
 
+void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+               uint32_t queue_id)
+{
+       s->queue_id = queue_id;
+}
+
 void
 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
 {
index 9e8c33e..751a59d 100644 (file)
@@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
 struct opdl_stage *
 opdl_stage_create(struct opdl_ring *t,  bool threadsafe);
 
+
+/**
+ * Set the internal queue id for each stage instance.
+ *
+ * @param s
+ *   The pointer of  stage instance.
+ *
+ * @param queue_id
+ *    The value of internal queue id.
+ */
+void
+opdl_stage_set_queue_id(struct opdl_stage *s,
+               uint32_t queue_id);
+
 /**
  * Prints information on opdl_ring instance and all its stages
  *
@@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
  */
 
 bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
                uint32_t index, bool atomic);
 
 #ifdef __cplusplus