eventdev: relax SMP barriers with C11 atomics
authorPhil Yang <phil.yang@arm.com>
Tue, 7 Jul 2020 15:54:53 +0000 (23:54 +0800)
committerThomas Monjalon <thomas@monjalon.net>
Wed, 8 Jul 2020 16:16:41 +0000 (18:16 +0200)
The impl_opaque field is shared between the timer arm and cancel
operations. Meanwhile, the state flag acts as a guard variable to
make sure the update of impl_opaque is synchronized. The original
code uses rte_smp barriers to achieve that. This patch uses C11
atomics with an explicit one-way memory barrier instead of full
barriers rte_smp_w/rmb() to avoid the unnecessary barrier on aarch64.

Since compilers can generate the same instructions for volatile and
non-volatile variable in C11 __atomics built-ins, so remain the volatile
keyword in front of state enum to avoid the ABI break issue.

Cc: stable@dpdk.org
Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Dharmik Thakkar <dharmik.thakkar@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
Acked-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>
lib/librte_eventdev/rte_event_timer_adapter.c

index aa01b4d..4c5e49e 100644 (file)
@@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim)
                sw->expired_timers[sw->n_expired_timers++] = tim;
                sw->stats.evtim_exp_count++;
 
-               evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+               __atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED,
+                               __ATOMIC_RELEASE);
        }
 
        if (event_buffer_batch_ready(&sw->buffer)) {
@@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
        int n_lcores;
        /* Timer list for this lcore is not in use. */
        uint16_t exp_state = 0;
+       enum rte_event_timer_state n_state;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
        /* Check that the service is running. */
@@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
        }
 
        for (i = 0; i < nb_evtims; i++) {
-               /* Don't modify the event timer state in these cases */
-               if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
+               n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+               if (n_state == RTE_EVENT_TIMER_ARMED) {
                        rte_errno = EALREADY;
                        break;
-               } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
-                            evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+               } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED ||
+                            n_state == RTE_EVENT_TIMER_CANCELED)) {
                        rte_errno = EINVAL;
                        break;
                }
 
                ret = check_timeout(evtims[i], adapter);
                if (unlikely(ret == -1)) {
-                       evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+                       __atomic_store_n(&evtims[i]->state,
+                                       RTE_EVENT_TIMER_ERROR_TOOLATE,
+                                       __ATOMIC_RELAXED);
                        rte_errno = EINVAL;
                        break;
                } else if (unlikely(ret == -2)) {
-                       evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+                       __atomic_store_n(&evtims[i]->state,
+                                       RTE_EVENT_TIMER_ERROR_TOOEARLY,
+                                       __ATOMIC_RELAXED);
                        rte_errno = EINVAL;
                        break;
                }
 
                if (unlikely(check_destination_event_queue(evtims[i],
                                                           adapter) < 0)) {
-                       evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+                       __atomic_store_n(&evtims[i]->state,
+                                       RTE_EVENT_TIMER_ERROR,
+                                       __ATOMIC_RELAXED);
                        rte_errno = EINVAL;
                        break;
                }
@@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
                                          SINGLE, lcore_id, NULL, evtims[i]);
                if (ret < 0) {
                        /* tim was in RUNNING or CONFIG state */
-                       evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+                       __atomic_store_n(&evtims[i]->state,
+                                       RTE_EVENT_TIMER_ERROR,
+                                       __ATOMIC_RELEASE);
                        break;
                }
 
-               rte_smp_wmb();
                EVTIM_LOG_DBG("armed an event timer");
-               evtims[i]->state = RTE_EVENT_TIMER_ARMED;
+               /* RELEASE ordering guarantees the adapter specific value
+                * changes observed before the update of state.
+                */
+               __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED,
+                               __ATOMIC_RELEASE);
        }
 
        if (i < nb_evtims)
@@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
        struct rte_timer *timp;
        uint64_t opaque;
        struct swtim *sw = swtim_pmd_priv(adapter);
+       enum rte_event_timer_state n_state;
 
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
        /* Check that the service is running. */
@@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 
        for (i = 0; i < nb_evtims; i++) {
                /* Don't modify the event timer state in these cases */
-               if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
+               /* ACQUIRE ordering guarantees the access of implementation
+                * specific opaque data under the correct state.
+                */
+               n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE);
+               if (n_state == RTE_EVENT_TIMER_CANCELED) {
                        rte_errno = EALREADY;
                        break;
-               } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
+               } else if (n_state != RTE_EVENT_TIMER_ARMED) {
                        rte_errno = EINVAL;
                        break;
                }
 
-               rte_smp_rmb();
-
                opaque = evtims[i]->impl_opaque[0];
                timp = (struct rte_timer *)(uintptr_t)opaque;
                RTE_ASSERT(timp != NULL);
@@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,
 
                rte_mempool_put(sw->tim_pool, (void **)timp);
 
-               evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
-
-               rte_smp_wmb();
+               /* The RELEASE ordering here pairs with atomic ordering
+                * to make sure the state update data observed between
+                * threads.
+                */
+               __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED,
+                               __ATOMIC_RELEASE);
        }
 
        return i;