X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_eventdev%2Frte_event_timer_adapter.c;h=4c5e49ea3ba2bdc1991cc70b97c526d0263f56f0;hb=47dcca067f4ab978df3414741be8fa00ddbac4f0;hp=232180326c2a720e55b81e3837b94a861a7a6408;hpb=9c99878aa1b16de26fcce82c112b401766dd910e;p=dpdk.git diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c index 232180326c..4c5e49ea3b 100644 --- a/lib/librte_eventdev/rte_event_timer_adapter.c +++ b/lib/librte_eventdev/rte_event_timer_adapter.c @@ -554,7 +554,7 @@ struct swtim { uint32_t timer_data_id; /* Track which cores have actually armed a timer */ struct { - rte_atomic16_t v; + uint16_t v; } __rte_cache_aligned in_use[RTE_MAX_LCORE]; /* Track which cores' timer lists should be polled */ unsigned int poll_lcores[RTE_MAX_LCORE]; @@ -583,6 +583,7 @@ swtim_callback(struct rte_timer *tim) uint16_t nb_evs_invalid = 0; uint64_t opaque; int ret; + int n_lcores; opaque = evtim->impl_opaque[1]; adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque; @@ -605,8 +606,13 @@ swtim_callback(struct rte_timer *tim) "with immediate expiry value"); } - if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore].v))) - sw->poll_lcores[sw->n_poll_lcores++] = lcore; + if (unlikely(sw->in_use[lcore].v == 0)) { + sw->in_use[lcore].v = 1; + n_lcores = __atomic_fetch_add(&sw->n_poll_lcores, 1, + __ATOMIC_RELAXED); + __atomic_store_n(&sw->poll_lcores[n_lcores], lcore, + __ATOMIC_RELAXED); + } } else { EVTIM_BUF_LOG_DBG("buffered an event timer expiry event"); @@ -623,7 +629,8 @@ swtim_callback(struct rte_timer *tim) sw->expired_timers[sw->n_expired_timers++] = tim; sw->stats.evtim_exp_count++; - evtim->state = RTE_EVENT_TIMER_NOT_ARMED; + __atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED, + __ATOMIC_RELEASE); } if (event_buffer_batch_ready(&sw->buffer)) { @@ -829,7 +836,7 @@ swtim_init(struct rte_event_timer_adapter *adapter) /* Initialize the variables that track in-use timer lists */ for (i = 0; i < RTE_MAX_LCORE; i++) - rte_atomic16_init(&sw->in_use[i].v); + sw->in_use[i].v = 0; /* Initialize the timer subsystem and allocate timer data instance */ ret = rte_timer_subsystem_init(); @@ -1011,6 +1018,10 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, uint32_t lcore_id = rte_lcore_id(); struct rte_timer *tim, *tims[nb_evtims]; uint64_t cycles; + int n_lcores; + /* Timer list for this lcore is not in use. */ + uint16_t exp_state = 0; + enum rte_event_timer_state n_state; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1029,12 +1040,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, /* If this is the first time we're arming an event timer on this lcore, * mark this lcore as "in use"; this will cause the service * function to process the timer list that corresponds to this lcore. + * The atomic compare-and-swap operation can prevent the race condition + * on in_use flag between multiple non-EAL threads. */ - if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) { + if (unlikely(__atomic_compare_exchange_n(&sw->in_use[lcore_id].v, + &exp_state, 1, 0, + __ATOMIC_RELAXED, __ATOMIC_RELAXED))) { EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll", lcore_id); - sw->poll_lcores[sw->n_poll_lcores] = lcore_id; - ++sw->n_poll_lcores; + n_lcores = __atomic_fetch_add(&sw->n_poll_lcores, 1, + __ATOMIC_RELAXED); + __atomic_store_n(&sw->poll_lcores[n_lcores], lcore_id, + __ATOMIC_RELAXED); } ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims, @@ -1045,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, } for (i = 0; i < nb_evtims; i++) { - /* Don't modify the event timer state in these cases */ - if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE); + if (n_state == RTE_EVENT_TIMER_ARMED) { rte_errno = EALREADY; break; - } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED || - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { + } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED || + n_state == RTE_EVENT_TIMER_CANCELED)) { rte_errno = EINVAL; break; } ret = check_timeout(evtims[i], adapter); if (unlikely(ret == -1)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR_TOOLATE, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } else if (unlikely(ret == -2)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR_TOOEARLY, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } if (unlikely(check_destination_event_queue(evtims[i], adapter) < 0)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } @@ -1084,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, SINGLE, lcore_id, NULL, evtims[i]); if (ret < 0) { /* tim was in RUNNING or CONFIG state */ - evtims[i]->state = RTE_EVENT_TIMER_ERROR; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR, + __ATOMIC_RELEASE); break; } - rte_smp_wmb(); EVTIM_LOG_DBG("armed an event timer"); - evtims[i]->state = RTE_EVENT_TIMER_ARMED; + /* RELEASE ordering guarantees the adapter specific value + * changes observed before the update of state. + */ + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED, + __ATOMIC_RELEASE); } if (i < nb_evtims) @@ -1117,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, struct rte_timer *timp; uint64_t opaque; struct swtim *sw = swtim_pmd_priv(adapter); + enum rte_event_timer_state n_state; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1128,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, for (i = 0; i < nb_evtims; i++) { /* Don't modify the event timer state in these cases */ - if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) { + /* ACQUIRE ordering guarantees the access of implementation + * specific opaque data under the correct state. + */ + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE); + if (n_state == RTE_EVENT_TIMER_CANCELED) { rte_errno = EALREADY; break; - } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) { + } else if (n_state != RTE_EVENT_TIMER_ARMED) { rte_errno = EINVAL; break; } - rte_smp_rmb(); - opaque = evtims[i]->impl_opaque[0]; timp = (struct rte_timer *)(uintptr_t)opaque; RTE_ASSERT(timp != NULL); @@ -1151,11 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, rte_mempool_put(sw->tim_pool, (void **)timp); - evtims[i]->state = RTE_EVENT_TIMER_CANCELED; - evtims[i]->impl_opaque[0] = 0; - evtims[i]->impl_opaque[1] = 0; - - rte_smp_wmb(); + /* The RELEASE ordering here pairs with atomic ordering + * to make sure the state update data observed between + * threads. + */ + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED, + __ATOMIC_RELEASE); } return i;