service: relax barriers with C11 atomics
authorPhil Yang <phil.yang@arm.com>
Wed, 6 May 2020 15:28:04 +0000 (23:28 +0800)
committerDavid Marchand <david.marchand@redhat.com>
Mon, 11 May 2020 11:21:54 +0000 (13:21 +0200)
The runstate, comp_runstate and app_runstate are used as guard variables
in the service core lib. To guarantee the inter-threads visibility of
these guard variables, it uses rte_smp_r/wmb. This patch use c11 atomic
built-ins to relax these barriers.

Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
lib/librte_eal/common/rte_service.c

index 9c1b3ab..6123a21 100644 (file)
@@ -265,7 +265,6 @@ rte_service_component_register(const struct rte_service_spec *spec,
        s->spec = *spec;
        s->internal_flags |= SERVICE_F_REGISTERED | SERVICE_F_START_CHECK;
 
-       rte_smp_wmb();
        rte_service_count++;
 
        if (id_ptr)
@@ -282,7 +281,6 @@ rte_service_component_unregister(uint32_t id)
        SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
 
        rte_service_count--;
-       rte_smp_wmb();
 
        s->internal_flags &= ~(SERVICE_F_REGISTERED);
 
@@ -301,12 +299,17 @@ rte_service_component_runstate_set(uint32_t id, uint32_t runstate)
        struct rte_service_spec_impl *s;
        SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
 
+       /* comp_runstate act as the guard variable. Use store-release
+        * memory order. This synchronizes with load-acquire in
+        * service_run and service_runstate_get function.
+        */
        if (runstate)
-               s->comp_runstate = RUNSTATE_RUNNING;
+               __atomic_store_n(&s->comp_runstate, RUNSTATE_RUNNING,
+                       __ATOMIC_RELEASE);
        else
-               s->comp_runstate = RUNSTATE_STOPPED;
+               __atomic_store_n(&s->comp_runstate, RUNSTATE_STOPPED,
+                       __ATOMIC_RELEASE);
 
-       rte_smp_wmb();
        return 0;
 }
 
@@ -316,12 +319,17 @@ rte_service_runstate_set(uint32_t id, uint32_t runstate)
        struct rte_service_spec_impl *s;
        SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
 
+       /* app_runstate act as the guard variable. Use store-release
+        * memory order. This synchronizes with load-acquire in
+        * service_run runstate_get function.
+        */
        if (runstate)
-               s->app_runstate = RUNSTATE_RUNNING;
+               __atomic_store_n(&s->app_runstate, RUNSTATE_RUNNING,
+                       __ATOMIC_RELEASE);
        else
-               s->app_runstate = RUNSTATE_STOPPED;
+               __atomic_store_n(&s->app_runstate, RUNSTATE_STOPPED,
+                       __ATOMIC_RELEASE);
 
-       rte_smp_wmb();
        return 0;
 }
 
@@ -330,15 +338,24 @@ rte_service_runstate_get(uint32_t id)
 {
        struct rte_service_spec_impl *s;
        SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
-       rte_smp_rmb();
 
-       int check_disabled = !(s->internal_flags & SERVICE_F_START_CHECK);
-       int lcore_mapped = (__atomic_load_n(&s->num_mapped_cores,
-               __ATOMIC_RELAXED) > 0);
+       /* comp_runstate and app_runstate act as the guard variables.
+        * Use load-acquire memory order. This synchronizes with
+        * store-release in service state set functions.
+        */
+       if (__atomic_load_n(&s->comp_runstate, __ATOMIC_ACQUIRE) ==
+                       RUNSTATE_RUNNING &&
+           __atomic_load_n(&s->app_runstate, __ATOMIC_ACQUIRE) ==
+                       RUNSTATE_RUNNING) {
+               int check_disabled = !(s->internal_flags &
+                       SERVICE_F_START_CHECK);
+               int lcore_mapped = (__atomic_load_n(&s->num_mapped_cores,
+                       __ATOMIC_RELAXED) > 0);
+
+               return (check_disabled | lcore_mapped);
+       } else
+               return 0;
 
-       return (s->app_runstate == RUNSTATE_RUNNING) &&
-               (s->comp_runstate == RUNSTATE_RUNNING) &&
-               (check_disabled | lcore_mapped);
 }
 
 static inline void
@@ -367,9 +384,15 @@ service_run(uint32_t i, struct core_state *cs, uint64_t service_mask,
        if (!s)
                return -EINVAL;
 
-       if (s->comp_runstate != RUNSTATE_RUNNING ||
-                       s->app_runstate != RUNSTATE_RUNNING ||
-                       !(service_mask & (UINT64_C(1) << i))) {
+       /* comp_runstate and app_runstate act as the guard variables.
+        * Use load-acquire memory order. This synchronizes with
+        * store-release in service state set functions.
+        */
+       if (__atomic_load_n(&s->comp_runstate, __ATOMIC_ACQUIRE) !=
+                       RUNSTATE_RUNNING ||
+           __atomic_load_n(&s->app_runstate, __ATOMIC_ACQUIRE) !=
+                       RUNSTATE_RUNNING ||
+           !(service_mask & (UINT64_C(1) << i))) {
                cs->service_active_on_lcore[i] = 0;
                return -ENOEXEC;
        }
@@ -434,7 +457,12 @@ service_runner_func(void *arg)
        const int lcore = rte_lcore_id();
        struct core_state *cs = &lcore_states[lcore];
 
-       while (cs->runstate == RUNSTATE_RUNNING) {
+       /* runstate act as the guard variable. Use load-acquire
+        * memory order here to synchronize with store-release
+        * in runstate update functions.
+        */
+       while (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
+                       RUNSTATE_RUNNING) {
                const uint64_t service_mask = cs->service_mask;
 
                for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
@@ -445,8 +473,6 @@ service_runner_func(void *arg)
                }
 
                cs->loops++;
-
-               rte_smp_rmb();
        }
 
        lcore_config[lcore].state = WAIT;
@@ -613,15 +639,18 @@ rte_service_lcore_reset_all(void)
                if (lcore_states[i].is_service_core) {
                        lcore_states[i].service_mask = 0;
                        set_lcore_state(i, ROLE_RTE);
-                       lcore_states[i].runstate = RUNSTATE_STOPPED;
+                       /* runstate act as guard variable Use
+                        * store-release memory order here to synchronize
+                        * with load-acquire in runstate read functions.
+                        */
+                       __atomic_store_n(&lcore_states[i].runstate,
+                               RUNSTATE_STOPPED, __ATOMIC_RELEASE);
                }
        }
        for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
                __atomic_store_n(&rte_services[i].num_mapped_cores, 0,
                        __ATOMIC_RELAXED);
 
-       rte_smp_wmb();
-
        return 0;
 }
 
@@ -637,9 +666,11 @@ rte_service_lcore_add(uint32_t lcore)
 
        /* ensure that after adding a core the mask and state are defaults */
        lcore_states[lcore].service_mask = 0;
-       lcore_states[lcore].runstate = RUNSTATE_STOPPED;
-
-       rte_smp_wmb();
+       /* Use store-release memory order here to synchronize with
+        * load-acquire in runstate read functions.
+        */
+       __atomic_store_n(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
+               __ATOMIC_RELEASE);
 
        return rte_eal_wait_lcore(lcore);
 }
@@ -654,7 +685,12 @@ rte_service_lcore_del(uint32_t lcore)
        if (!cs->is_service_core)
                return -EINVAL;
 
-       if (cs->runstate != RUNSTATE_STOPPED)
+       /* runstate act as the guard variable. Use load-acquire
+        * memory order here to synchronize with store-release
+        * in runstate update functions.
+        */
+       if (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) !=
+                       RUNSTATE_STOPPED)
                return -EBUSY;
 
        set_lcore_state(lcore, ROLE_RTE);
@@ -673,13 +709,21 @@ rte_service_lcore_start(uint32_t lcore)
        if (!cs->is_service_core)
                return -EINVAL;
 
-       if (cs->runstate == RUNSTATE_RUNNING)
+       /* runstate act as the guard variable. Use load-acquire
+        * memory order here to synchronize with store-release
+        * in runstate update functions.
+        */
+       if (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
+                       RUNSTATE_RUNNING)
                return -EALREADY;
 
        /* set core to run state first, and then launch otherwise it will
         * return immediately as runstate keeps it in the service poll loop
         */
-       cs->runstate = RUNSTATE_RUNNING;
+       /* Use load-acquire memory order here to synchronize with
+        * store-release in runstate update functions.
+        */
+       __atomic_store_n(&cs->runstate, RUNSTATE_RUNNING, __ATOMIC_RELEASE);
 
        int ret = rte_eal_remote_launch(service_runner_func, 0, lcore);
        /* returns -EBUSY if the core is already launched, 0 on success */
@@ -692,7 +736,12 @@ rte_service_lcore_stop(uint32_t lcore)
        if (lcore >= RTE_MAX_LCORE)
                return -EINVAL;
 
-       if (lcore_states[lcore].runstate == RUNSTATE_STOPPED)
+       /* runstate act as the guard variable. Use load-acquire
+        * memory order here to synchronize with store-release
+        * in runstate update functions.
+        */
+       if (__atomic_load_n(&lcore_states[lcore].runstate, __ATOMIC_ACQUIRE) ==
+                       RUNSTATE_STOPPED)
                return -EALREADY;
 
        uint32_t i;
@@ -712,7 +761,11 @@ rte_service_lcore_stop(uint32_t lcore)
                        return -EBUSY;
        }
 
-       lcore_states[lcore].runstate = RUNSTATE_STOPPED;
+       /* Use store-release memory order here to synchronize with
+        * load-acquire in runstate read functions.
+        */
+       __atomic_store_n(&lcore_states[lcore].runstate, RUNSTATE_STOPPED,
+               __ATOMIC_RELEASE);
 
        return 0;
 }