if (fdata->sched_core[lcore_id] && (fdata->sched_single ||
rte_atomic32_cmpset(&(fdata->sched_lock), 0, 1))) {
- rte_service_run_iter_on_app_lcore(fdata->evdev_service_id);
+ rte_service_run_iter_on_app_lcore(fdata->evdev_service_id, 1);
if (cdata.dump_dev_signal) {
rte_event_dev_dump(0, stdout);
cdata.dump_dev_signal = 0;
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
- * This function runs a service callback from a non-service lcore context.
- * The *id* of the service to be run is passed in, and the service-callback
- * is executed on the calling lcore immediately if possible. If the service is
- * not multi-thread capable and another thread is currently executing it, this
- * function returns without running the callback.
+ * This function runs a service callback from a non-service lcore.
+ *
+ * This function is designed to enable gradual porting to service cores, and
+ * to enable unit tests to verify a service behaves as expected.
+ *
+ * When called, this function ensures that the service identified by *id* is
+ * safe to run on this lcore. Multi-thread safe services are invoked even if
+ * other cores are simultaneously running them as they are multi-thread safe.
+ *
+ * Multi-thread unsafe services are handled depending on the variable
+ * *serialize_multithread_unsafe*:
+ * - When set, the function will check if a service is already being invoked
+ * on another lcore, refusing to run it and returning -EBUSY.
+ * - When zero, the application takes responsibility to ensure that the service
+ * indicated by *id* is not going to be invoked by another lcore. This setting
+ * avoids atomic operations, so is likely to be more performant.
+ *
+ * @param id The ID of the service to run
+ * @param serialize_multithread_unsafe This parameter indicates to the service
+ * cores library if it is required to use atomics to serialize access
+ * to mult-thread unsafe services. As there is an overhead in using
+ * atomics, applications can choose to enable or disable this feature
*
* Note that any thread calling this function MUST be a DPDK EAL thread, as
* the *rte_lcore_id* function is used to access internal data structures.
* @retval 0 Service was run on the calling thread successfully
* @retval -EBUSY Another lcore is executing the service, and it is not a
* multi-thread safe service, so the service was not run on this lcore
- * @retval -ENOEXEC Service is not in a runnable state
+ * @retval -ENOEXEC Service is not in a run-able state
* @retval -EINVAL Invalid service id
*/
-int32_t rte_service_run_iter_on_app_lcore(uint32_t id);
+int32_t rte_service_run_iter_on_app_lcore(uint32_t id,
+ uint32_t serialize_multithread_unsafe);
/**
* @warning
uint8_t internal_flags;
/* per service statistics */
- uint32_t num_mapped_cores;
+ rte_atomic32_t num_mapped_cores;
uint64_t calls;
uint64_t cycles_spent;
} __rte_cache_aligned;
rte_smp_rmb();
int check_disabled = !(s->internal_flags & SERVICE_F_START_CHECK);
- int lcore_mapped = (s->num_mapped_cores > 0);
+ int lcore_mapped = (rte_atomic32_read(&s->num_mapped_cores) > 0);
return (s->app_runstate == RUNSTATE_RUNNING) &&
(s->comp_runstate == RUNSTATE_RUNNING) &&
* mapped, atomic ops are not required.
*/
const int use_atomics = (service_mt_safe(s) == 0) &&
- (s->num_mapped_cores > 1);
+ (rte_atomic32_read(&s->num_mapped_cores) > 1);
if (use_atomics) {
if (!rte_atomic32_cmpset((uint32_t *)&s->execute_lock, 0, 1))
return -EBUSY;
return 0;
}
-int32_t rte_service_run_iter_on_app_lcore(uint32_t id)
+int32_t rte_service_run_iter_on_app_lcore(uint32_t id,
+ uint32_t serialize_mt_unsafe)
{
/* run service on calling core, using all-ones as the service mask */
+ if (!service_valid(id))
+ return -EINVAL;
+
struct core_state *cs = &lcore_states[rte_lcore_id()];
- return service_run(id, cs, UINT64_MAX);
+ struct rte_service_spec_impl *s = &rte_services[id];
+
+ /* Atomically add this core to the mapped cores first, then examine if
+ * we can run the service. This avoids a race condition between
+ * checking the value, and atomically adding to the mapped count.
+ */
+ if (serialize_mt_unsafe)
+ rte_atomic32_inc(&s->num_mapped_cores);
+
+ if (service_mt_safe(s) == 0 &&
+ rte_atomic32_read(&s->num_mapped_cores) > 1) {
+ if (serialize_mt_unsafe)
+ rte_atomic32_dec(&s->num_mapped_cores);
+ return -EBUSY;
+ }
+
+ int ret = service_run(id, cs, UINT64_MAX);
+
+ if (serialize_mt_unsafe)
+ rte_atomic32_dec(&s->num_mapped_cores);
+
+ return ret;
}
static int32_t
if (set) {
if (*set) {
lcore_states[lcore].service_mask |= sid_mask;
- rte_services[sid].num_mapped_cores++;
+ rte_atomic32_inc(&rte_services[sid].num_mapped_cores);
} else {
lcore_states[lcore].service_mask &= ~(sid_mask);
- rte_services[sid].num_mapped_cores--;
+ rte_atomic32_dec(&rte_services[sid].num_mapped_cores);
}
}
lcore_states[i].runstate = RUNSTATE_STOPPED;
}
for (i = 0; i < RTE_SERVICE_NUM_MAX; i++)
- rte_services[i].num_mapped_cores = 0;
+ rte_atomic32_set(&rte_services[i].num_mapped_cores, 0);
rte_smp_wmb();
for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
int32_t enabled = service_mask & (UINT64_C(1) << i);
int32_t service_running = rte_service_runstate_get(i);
- int32_t only_core = rte_services[i].num_mapped_cores == 1;
+ int32_t only_core = (1 ==
+ rte_atomic32_read(&rte_services[i].num_mapped_cores));
/* if the core is mapped, and the service is running, and this
* is the only core that is mapped, the service would cease to
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
struct test_event_dev_stats stats;
err = test_event_dev_stats_get(evdev, &stats);
}
/* Run schedule() as dir packets may need to be re-ordered */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
struct test_event_dev_stats stats;
err = test_event_dev_stats_get(evdev, &stats);
printf("%d: error failed to enqueue\n", __LINE__);
return -1;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
uint32_t deq_pkts;
deq_pkts = rte_event_dequeue_burst(evdev, 0, &ev, 1, 0);
return -1;
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* Check stats for all NUM_PKTS arrived to sched core */
struct test_event_dev_stats stats;
}
/* schedule */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
struct test_event_dev_stats stats;
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* Device names / values */
int num_stats = rte_event_dev_xstats_names_get(evdev,
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
struct rte_event ev[NPKTS];
int deq = rte_event_dequeue_burst(evdev, t->port[0], ev,
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
static const char * const dev_names[] = {
"dev_rx", "dev_tx", "dev_drop", "dev_sched_calls",
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* dequeue packets, verify priority was upheld */
struct rte_event ev[32];
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
struct test_event_dev_stats stats;
err = test_event_dev_stats_get(evdev, &stats);
}
/* call the scheduler */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* Dequeue the flow 0 packet from port 1, so that we can then drop */
struct rte_event ev;
rte_event_enqueue_burst(evdev, t->port[1], &release_ev, 1);
/* call the scheduler */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/*
* Set up the next set of flows, first a new flow to fill up
}
/* schedule */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (err) {
while (rte_event_dequeue_burst(evdev, i, &ev, 1, 0))
rte_event_enqueue_burst(evdev, i, &release_ev, 1);
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
cleanup(t);
return 0;
}
/* call the scheduler */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (err) {
return -1;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (err) {
printf("%d: Failed to enqueue\n", __LINE__);
return -1;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (stats.port_inflight[wrk_enq] != 0) {
}
/* schedule */
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (err) {
* As the scheduler core decrements inflights, it needs to run to
* process packets to act on the drop messages
*/
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (stats.port_inflight[p1] != 0) {
* As the scheduler core decrements inflights, it needs to run to
* process packets to act on the drop messages
*/
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
err = test_event_dev_stats_get(evdev, &stats);
if (stats.port_inflight[p2] != 0) {
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* use extra slot to make logic in loops easier */
struct rte_event deq_ev[w3_port + 1];
return -1;
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* dequeue from the tx ports, we should get 3 packets */
deq_pkts = rte_event_dequeue_burst(evdev, t->port[tx_port], deq_ev,
printf("%d: Error doing first enqueue\n", __LINE__);
goto err;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
if (rte_event_dev_xstats_by_name_get(evdev, "port_0_cq_ring_used", NULL)
!= 1)
printf("%d: Error with enqueue\n", __LINE__);
goto err;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
} while (rte_event_dev_xstats_by_name_get(evdev,
rx_port_free_stat, NULL) != 0);
printf("%d: Error with enqueue\n", __LINE__);
goto err;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* check that the other port still has an empty CQ */
if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL)
printf("%d: Error with enqueue\n", __LINE__);
goto err;
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL)
!= 1) {
while (rte_eal_get_lcore_state(p_lcore) != FINISHED ||
rte_eal_get_lcore_state(w_lcore) != FINISHED) {
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
uint64_t new_cycles = rte_get_timer_cycles();
cycles = new_cycles;
}
}
- rte_service_run_iter_on_app_lcore(t->service_id);
+ rte_service_run_iter_on_app_lcore(t->service_id, 1);
/* ensure all completions are flushed */
rte_eal_mp_wait_lcore();
return TEST_SUCCESS;
}
+static int32_t
+delay_as_a_mt_safe_service(void *args)
+{
+ RTE_SET_USED(args);
+ uint32_t *params = args;
+
+ /* retrieve done flag and atomic lock to inc/dec */
+ uint32_t *done = ¶ms[0];
+ rte_atomic32_t *lock = (rte_atomic32_t *)¶ms[1];
+
+ while (!*done) {
+ rte_atomic32_inc(lock);
+ rte_delay_us(500);
+ if (rte_atomic32_read(lock) > 1)
+ /* pass: second core has simultaneously incremented */
+ *done = 1;
+ rte_atomic32_dec(lock);
+ }
+
+ return 0;
+}
+
+static int32_t
+delay_as_a_service(void *args)
+{
+ uint32_t *done = (uint32_t *)args;
+ while (!*done)
+ rte_delay_ms(5);
+ return 0;
+}
+
+static int
+service_run_on_app_core_func(void *arg)
+{
+ uint32_t *delay_service_id = (uint32_t *)arg;
+ return rte_service_run_iter_on_app_lcore(*delay_service_id, 1);
+}
+
+static int
+service_app_lcore_poll_impl(const int mt_safe)
+{
+ uint32_t params[2] = {0};
+
+ struct rte_service_spec service;
+ memset(&service, 0, sizeof(struct rte_service_spec));
+ snprintf(service.name, sizeof(service.name), MT_SAFE_SERVICE_NAME);
+ if (mt_safe) {
+ service.callback = delay_as_a_mt_safe_service;
+ service.callback_userdata = params;
+ service.capabilities |= RTE_SERVICE_CAP_MT_SAFE;
+ } else {
+ service.callback = delay_as_a_service;
+ service.callback_userdata = ¶ms;
+ }
+
+ uint32_t id;
+ TEST_ASSERT_EQUAL(0, rte_service_component_register(&service, &id),
+ "Register of app lcore delay service failed");
+
+ rte_service_component_runstate_set(id, 1);
+ rte_service_runstate_set(id, 1);
+
+ uint32_t app_core2 = rte_get_next_lcore(slcore_id, 1, 1);
+ int app_core2_ret = rte_eal_remote_launch(service_run_on_app_core_func,
+ &id, app_core2);
+
+ rte_delay_ms(100);
+
+ int app_core1_ret = service_run_on_app_core_func(&id);
+
+ /* flag done, then wait for the spawned 2nd core to return */
+ params[0] = 1;
+ rte_eal_mp_wait_lcore();
+
+ /* core two gets launched first - and should hold the service lock */
+ TEST_ASSERT_EQUAL(0, app_core2_ret,
+ "App core2 : run service didn't return zero");
+
+ if (mt_safe) {
+ /* mt safe should have both cores return 0 for success */
+ TEST_ASSERT_EQUAL(0, app_core1_ret,
+ "MT Safe: App core1 didn't return 0");
+ } else {
+ /* core one attempts to run later - should be blocked */
+ TEST_ASSERT_EQUAL(-EBUSY, app_core1_ret,
+ "MT Unsafe: App core1 didn't return -EBUSY");
+ }
+
+ unregister_all();
+
+ return TEST_SUCCESS;
+}
+
+static int
+service_app_lcore_mt_safe(void)
+{
+ const int mt_safe = 1;
+ return service_app_lcore_poll_impl(mt_safe);
+}
+
+static int
+service_app_lcore_mt_unsafe(void)
+{
+ const int mt_safe = 0;
+ return service_app_lcore_poll_impl(mt_safe);
+}
+
/* start and stop a service core - ensuring it goes back to sleep */
static int
service_lcore_start_stop(void)
TEST_CASE_ST(dummy_register, NULL, service_lcore_en_dis_able),
TEST_CASE_ST(dummy_register, NULL, service_mt_unsafe_poll),
TEST_CASE_ST(dummy_register, NULL, service_mt_safe_poll),
+ TEST_CASE_ST(dummy_register, NULL, service_app_lcore_mt_safe),
+ TEST_CASE_ST(dummy_register, NULL, service_app_lcore_mt_unsafe),
TEST_CASES_END() /**< NULL terminate unit test array */
}
};