From: Harry van Haaren Date: Wed, 1 Nov 2017 18:48:01 +0000 (+0000) Subject: service: fix race in service on app lcore function X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=8d39d3e237c2;p=dpdk.git service: fix race in service on app lcore function This commit fixes a possible race condition if an application uses the service-cores infrastructure and the function to run a service on an application lcore at the same time. The fix is to change the num_mapped_cores variable to be an atomic variable. This causes concurrent accesses by multiple threads to a service using rte_service_run_iter_on_app_lcore() to detect if another core is currently mapped to the service, and refuses to run if it is not multi-thread safe. The run iteration on app lcore function has two arguments, the service id to run, and if atomics should be used to serialize access to multi-thread unsafe services. This allows applications to choose if they wish to use use the service-cores feature, or if they take responsibility themselves for serializing invoking a service. See doxygen documentation for more details. Two unit tests were added to verify the behaviour of the function to run a service on an application core, testing both a multi-thread safe service, and a multi-thread unsafe service. The doxygen API documentation for the function has been updated to reflect the current and correct behaviour. Fixes: e9139a32f6e8 ("service: add function to run on app lcore") Signed-off-by: Harry van Haaren Acked-by: Bruce Richardson --- diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index 948be67b3b..5f431d87d4 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -245,7 +245,7 @@ schedule_devices(unsigned int lcore_id) if (fdata->sched_core[lcore_id] && (fdata->sched_single || rte_atomic32_cmpset(&(fdata->sched_lock), 0, 1))) { - rte_service_run_iter_on_app_lcore(fdata->evdev_service_id); + rte_service_run_iter_on_app_lcore(fdata->evdev_service_id, 1); if (cdata.dump_dev_signal) { rte_event_dev_dump(0, stdout); cdata.dump_dev_signal = 0; diff --git a/lib/librte_eal/common/include/rte_service.h b/lib/librte_eal/common/include/rte_service.h index d9de5ad74e..927244065b 100644 --- a/lib/librte_eal/common/include/rte_service.h +++ b/lib/librte_eal/common/include/rte_service.h @@ -232,11 +232,28 @@ int32_t rte_service_set_runstate_mapped_check(uint32_t id, int32_t enable); * @warning * @b EXPERIMENTAL: this API may change without prior notice * - * This function runs a service callback from a non-service lcore context. - * The *id* of the service to be run is passed in, and the service-callback - * is executed on the calling lcore immediately if possible. If the service is - * not multi-thread capable and another thread is currently executing it, this - * function returns without running the callback. + * This function runs a service callback from a non-service lcore. + * + * This function is designed to enable gradual porting to service cores, and + * to enable unit tests to verify a service behaves as expected. + * + * When called, this function ensures that the service identified by *id* is + * safe to run on this lcore. Multi-thread safe services are invoked even if + * other cores are simultaneously running them as they are multi-thread safe. + * + * Multi-thread unsafe services are handled depending on the variable + * *serialize_multithread_unsafe*: + * - When set, the function will check if a service is already being invoked + * on another lcore, refusing to run it and returning -EBUSY. + * - When zero, the application takes responsibility to ensure that the service + * indicated by *id* is not going to be invoked by another lcore. This setting + * avoids atomic operations, so is likely to be more performant. + * + * @param id The ID of the service to run + * @param serialize_multithread_unsafe This parameter indicates to the service + * cores library if it is required to use atomics to serialize access + * to mult-thread unsafe services. As there is an overhead in using + * atomics, applications can choose to enable or disable this feature * * Note that any thread calling this function MUST be a DPDK EAL thread, as * the *rte_lcore_id* function is used to access internal data structures. @@ -244,10 +261,11 @@ int32_t rte_service_set_runstate_mapped_check(uint32_t id, int32_t enable); * @retval 0 Service was run on the calling thread successfully * @retval -EBUSY Another lcore is executing the service, and it is not a * multi-thread safe service, so the service was not run on this lcore - * @retval -ENOEXEC Service is not in a runnable state + * @retval -ENOEXEC Service is not in a run-able state * @retval -EINVAL Invalid service id */ -int32_t rte_service_run_iter_on_app_lcore(uint32_t id); +int32_t rte_service_run_iter_on_app_lcore(uint32_t id, + uint32_t serialize_multithread_unsafe); /** * @warning diff --git a/lib/librte_eal/common/rte_service.c b/lib/librte_eal/common/rte_service.c index f17bf4bb04..09b758c940 100644 --- a/lib/librte_eal/common/rte_service.c +++ b/lib/librte_eal/common/rte_service.c @@ -77,7 +77,7 @@ struct rte_service_spec_impl { uint8_t internal_flags; /* per service statistics */ - uint32_t num_mapped_cores; + rte_atomic32_t num_mapped_cores; uint64_t calls; uint64_t cycles_spent; } __rte_cache_aligned; @@ -325,7 +325,7 @@ rte_service_runstate_get(uint32_t id) rte_smp_rmb(); int check_disabled = !(s->internal_flags & SERVICE_F_START_CHECK); - int lcore_mapped = (s->num_mapped_cores > 0); + int lcore_mapped = (rte_atomic32_read(&s->num_mapped_cores) > 0); return (s->app_runstate == RUNSTATE_RUNNING) && (s->comp_runstate == RUNSTATE_RUNNING) && @@ -365,7 +365,7 @@ service_run(uint32_t i, struct core_state *cs, uint64_t service_mask) * mapped, atomic ops are not required. */ const int use_atomics = (service_mt_safe(s) == 0) && - (s->num_mapped_cores > 1); + (rte_atomic32_read(&s->num_mapped_cores) > 1); if (use_atomics) { if (!rte_atomic32_cmpset((uint32_t *)&s->execute_lock, 0, 1)) return -EBUSY; @@ -378,11 +378,36 @@ service_run(uint32_t i, struct core_state *cs, uint64_t service_mask) return 0; } -int32_t rte_service_run_iter_on_app_lcore(uint32_t id) +int32_t rte_service_run_iter_on_app_lcore(uint32_t id, + uint32_t serialize_mt_unsafe) { /* run service on calling core, using all-ones as the service mask */ + if (!service_valid(id)) + return -EINVAL; + struct core_state *cs = &lcore_states[rte_lcore_id()]; - return service_run(id, cs, UINT64_MAX); + struct rte_service_spec_impl *s = &rte_services[id]; + + /* Atomically add this core to the mapped cores first, then examine if + * we can run the service. This avoids a race condition between + * checking the value, and atomically adding to the mapped count. + */ + if (serialize_mt_unsafe) + rte_atomic32_inc(&s->num_mapped_cores); + + if (service_mt_safe(s) == 0 && + rte_atomic32_read(&s->num_mapped_cores) > 1) { + if (serialize_mt_unsafe) + rte_atomic32_dec(&s->num_mapped_cores); + return -EBUSY; + } + + int ret = service_run(id, cs, UINT64_MAX); + + if (serialize_mt_unsafe) + rte_atomic32_dec(&s->num_mapped_cores); + + return ret; } static int32_t @@ -522,10 +547,10 @@ service_update(struct rte_service_spec *service, uint32_t lcore, if (set) { if (*set) { lcore_states[lcore].service_mask |= sid_mask; - rte_services[sid].num_mapped_cores++; + rte_atomic32_inc(&rte_services[sid].num_mapped_cores); } else { lcore_states[lcore].service_mask &= ~(sid_mask); - rte_services[sid].num_mapped_cores--; + rte_atomic32_dec(&rte_services[sid].num_mapped_cores); } } @@ -568,7 +593,7 @@ int32_t rte_service_lcore_reset_all(void) lcore_states[i].runstate = RUNSTATE_STOPPED; } for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) - rte_services[i].num_mapped_cores = 0; + rte_atomic32_set(&rte_services[i].num_mapped_cores, 0); rte_smp_wmb(); @@ -664,7 +689,8 @@ rte_service_lcore_stop(uint32_t lcore) for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) { int32_t enabled = service_mask & (UINT64_C(1) << i); int32_t service_running = rte_service_runstate_get(i); - int32_t only_core = rte_services[i].num_mapped_cores == 1; + int32_t only_core = (1 == + rte_atomic32_read(&rte_services[i].num_mapped_cores)); /* if the core is mapped, and the service is running, and this * is the only core that is mapped, the service would cease to diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c index b86b137ed8..01aa4d9888 100644 --- a/test/test/test_eventdev_sw.c +++ b/test/test/test_eventdev_sw.c @@ -417,7 +417,7 @@ run_prio_packet_test(struct test *t) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); struct test_event_dev_stats stats; err = test_event_dev_stats_get(evdev, &stats); @@ -509,7 +509,7 @@ test_single_directed_packet(struct test *t) } /* Run schedule() as dir packets may need to be re-ordered */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); struct test_event_dev_stats stats; err = test_event_dev_stats_get(evdev, &stats); @@ -576,7 +576,7 @@ test_directed_forward_credits(struct test *t) printf("%d: error failed to enqueue\n", __LINE__); return -1; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); uint32_t deq_pkts; deq_pkts = rte_event_dequeue_burst(evdev, 0, &ev, 1, 0); @@ -738,7 +738,7 @@ burst_packets(struct test *t) return -1; } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* Check stats for all NUM_PKTS arrived to sched core */ struct test_event_dev_stats stats; @@ -827,7 +827,7 @@ abuse_inflights(struct test *t) } /* schedule */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); struct test_event_dev_stats stats; @@ -965,7 +965,7 @@ xstats_tests(struct test *t) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* Device names / values */ int num_stats = rte_event_dev_xstats_names_get(evdev, @@ -1292,7 +1292,7 @@ port_reconfig_credits(struct test *t) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); struct rte_event ev[NPKTS]; int deq = rte_event_dequeue_burst(evdev, t->port[0], ev, @@ -1518,7 +1518,7 @@ xstats_id_reset_tests(struct test *t) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); static const char * const dev_names[] = { "dev_rx", "dev_tx", "dev_drop", "dev_sched_calls", @@ -1909,7 +1909,7 @@ qid_priorities(struct test *t) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* dequeue packets, verify priority was upheld */ struct rte_event ev[32]; @@ -1990,7 +1990,7 @@ load_balancing(struct test *t) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); struct test_event_dev_stats stats; err = test_event_dev_stats_get(evdev, &stats); @@ -2090,7 +2090,7 @@ load_balancing_history(struct test *t) } /* call the scheduler */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* Dequeue the flow 0 packet from port 1, so that we can then drop */ struct rte_event ev; @@ -2107,7 +2107,7 @@ load_balancing_history(struct test *t) rte_event_enqueue_burst(evdev, t->port[1], &release_ev, 1); /* call the scheduler */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* * Set up the next set of flows, first a new flow to fill up @@ -2140,7 +2140,7 @@ load_balancing_history(struct test *t) } /* schedule */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (err) { @@ -2184,7 +2184,7 @@ load_balancing_history(struct test *t) while (rte_event_dequeue_burst(evdev, i, &ev, 1, 0)) rte_event_enqueue_burst(evdev, i, &release_ev, 1); } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); cleanup(t); return 0; @@ -2250,7 +2250,7 @@ invalid_qid(struct test *t) } /* call the scheduler */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (err) { @@ -2335,7 +2335,7 @@ single_packet(struct test *t) return -1; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (err) { @@ -2378,7 +2378,7 @@ single_packet(struct test *t) printf("%d: Failed to enqueue\n", __LINE__); return -1; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (stats.port_inflight[wrk_enq] != 0) { @@ -2466,7 +2466,7 @@ inflight_counts(struct test *t) } /* schedule */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (err) { @@ -2522,7 +2522,7 @@ inflight_counts(struct test *t) * As the scheduler core decrements inflights, it needs to run to * process packets to act on the drop messages */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (stats.port_inflight[p1] != 0) { @@ -2557,7 +2557,7 @@ inflight_counts(struct test *t) * As the scheduler core decrements inflights, it needs to run to * process packets to act on the drop messages */ - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); err = test_event_dev_stats_get(evdev, &stats); if (stats.port_inflight[p2] != 0) { @@ -2651,7 +2651,7 @@ parallel_basic(struct test *t, int check_order) } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* use extra slot to make logic in loops easier */ struct rte_event deq_ev[w3_port + 1]; @@ -2678,7 +2678,7 @@ parallel_basic(struct test *t, int check_order) return -1; } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* dequeue from the tx ports, we should get 3 packets */ deq_pkts = rte_event_dequeue_burst(evdev, t->port[tx_port], deq_ev, @@ -2756,7 +2756,7 @@ holb(struct test *t) /* test to check we avoid basic head-of-line blocking */ printf("%d: Error doing first enqueue\n", __LINE__); goto err; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); if (rte_event_dev_xstats_by_name_get(evdev, "port_0_cq_ring_used", NULL) != 1) @@ -2781,7 +2781,7 @@ holb(struct test *t) /* test to check we avoid basic head-of-line blocking */ printf("%d: Error with enqueue\n", __LINE__); goto err; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); } while (rte_event_dev_xstats_by_name_get(evdev, rx_port_free_stat, NULL) != 0); @@ -2791,7 +2791,7 @@ holb(struct test *t) /* test to check we avoid basic head-of-line blocking */ printf("%d: Error with enqueue\n", __LINE__); goto err; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* check that the other port still has an empty CQ */ if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL) @@ -2814,7 +2814,7 @@ holb(struct test *t) /* test to check we avoid basic head-of-line blocking */ printf("%d: Error with enqueue\n", __LINE__); goto err; } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL) != 1) { @@ -3004,7 +3004,7 @@ worker_loopback(struct test *t) while (rte_eal_get_lcore_state(p_lcore) != FINISHED || rte_eal_get_lcore_state(w_lcore) != FINISHED) { - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); uint64_t new_cycles = rte_get_timer_cycles(); @@ -3031,7 +3031,7 @@ worker_loopback(struct test *t) cycles = new_cycles; } } - rte_service_run_iter_on_app_lcore(t->service_id); + rte_service_run_iter_on_app_lcore(t->service_id, 1); /* ensure all completions are flushed */ rte_eal_mp_wait_lcore(); diff --git a/test/test/test_service_cores.c b/test/test/test_service_cores.c index ee8313ef69..311c704eb9 100644 --- a/test/test/test_service_cores.c +++ b/test/test/test_service_cores.c @@ -548,6 +548,113 @@ service_mt_unsafe_poll(void) return TEST_SUCCESS; } +static int32_t +delay_as_a_mt_safe_service(void *args) +{ + RTE_SET_USED(args); + uint32_t *params = args; + + /* retrieve done flag and atomic lock to inc/dec */ + uint32_t *done = ¶ms[0]; + rte_atomic32_t *lock = (rte_atomic32_t *)¶ms[1]; + + while (!*done) { + rte_atomic32_inc(lock); + rte_delay_us(500); + if (rte_atomic32_read(lock) > 1) + /* pass: second core has simultaneously incremented */ + *done = 1; + rte_atomic32_dec(lock); + } + + return 0; +} + +static int32_t +delay_as_a_service(void *args) +{ + uint32_t *done = (uint32_t *)args; + while (!*done) + rte_delay_ms(5); + return 0; +} + +static int +service_run_on_app_core_func(void *arg) +{ + uint32_t *delay_service_id = (uint32_t *)arg; + return rte_service_run_iter_on_app_lcore(*delay_service_id, 1); +} + +static int +service_app_lcore_poll_impl(const int mt_safe) +{ + uint32_t params[2] = {0}; + + struct rte_service_spec service; + memset(&service, 0, sizeof(struct rte_service_spec)); + snprintf(service.name, sizeof(service.name), MT_SAFE_SERVICE_NAME); + if (mt_safe) { + service.callback = delay_as_a_mt_safe_service; + service.callback_userdata = params; + service.capabilities |= RTE_SERVICE_CAP_MT_SAFE; + } else { + service.callback = delay_as_a_service; + service.callback_userdata = ¶ms; + } + + uint32_t id; + TEST_ASSERT_EQUAL(0, rte_service_component_register(&service, &id), + "Register of app lcore delay service failed"); + + rte_service_component_runstate_set(id, 1); + rte_service_runstate_set(id, 1); + + uint32_t app_core2 = rte_get_next_lcore(slcore_id, 1, 1); + int app_core2_ret = rte_eal_remote_launch(service_run_on_app_core_func, + &id, app_core2); + + rte_delay_ms(100); + + int app_core1_ret = service_run_on_app_core_func(&id); + + /* flag done, then wait for the spawned 2nd core to return */ + params[0] = 1; + rte_eal_mp_wait_lcore(); + + /* core two gets launched first - and should hold the service lock */ + TEST_ASSERT_EQUAL(0, app_core2_ret, + "App core2 : run service didn't return zero"); + + if (mt_safe) { + /* mt safe should have both cores return 0 for success */ + TEST_ASSERT_EQUAL(0, app_core1_ret, + "MT Safe: App core1 didn't return 0"); + } else { + /* core one attempts to run later - should be blocked */ + TEST_ASSERT_EQUAL(-EBUSY, app_core1_ret, + "MT Unsafe: App core1 didn't return -EBUSY"); + } + + unregister_all(); + + return TEST_SUCCESS; +} + +static int +service_app_lcore_mt_safe(void) +{ + const int mt_safe = 1; + return service_app_lcore_poll_impl(mt_safe); +} + +static int +service_app_lcore_mt_unsafe(void) +{ + const int mt_safe = 0; + return service_app_lcore_poll_impl(mt_safe); +} + /* start and stop a service core - ensuring it goes back to sleep */ static int service_lcore_start_stop(void) @@ -613,6 +720,8 @@ static struct unit_test_suite service_tests = { TEST_CASE_ST(dummy_register, NULL, service_lcore_en_dis_able), TEST_CASE_ST(dummy_register, NULL, service_mt_unsafe_poll), TEST_CASE_ST(dummy_register, NULL, service_mt_safe_poll), + TEST_CASE_ST(dummy_register, NULL, service_app_lcore_mt_safe), + TEST_CASE_ST(dummy_register, NULL, service_app_lcore_mt_unsafe), TEST_CASES_END() /**< NULL terminate unit test array */ } };