From: Gage Eads Date: Mon, 11 Dec 2017 17:56:31 +0000 (-0600) Subject: eventdev: add implicit release disable capability X-Git-Tag: spdx-start~216 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=ec36d881f56d;p=dpdk.git eventdev: add implicit release disable capability This commit introduces a capability for disabling the "implicit" release functionality for a port, which prevents the eventdev PMD from issuing outstanding releases for previously dequeued events when dequeuing a new batch of events. If a PMD does not support this capability, the application will receive an error if it attempts to setup a port with implicit releases disabled. Otherwise, if the port is configured with implicit releases disabled, the application must release each dequeued event by invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or RTE_EVENT_OP_FORWARD. Signed-off-by: Gage Eads Acked-by: Harry van Haaren --- diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c b/drivers/event/dpaa2/dpaa2_eventdev.c index ba7e5145ec..467e98c7cf 100644 --- a/drivers/event/dpaa2/dpaa2_eventdev.c +++ b/drivers/event/dpaa2/dpaa2_eventdev.c @@ -414,6 +414,7 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH; port_conf->enqueue_depth = DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH; + port_conf->disable_implicit_release = 0; } static void diff --git a/drivers/event/octeontx/ssovf_evdev.c b/drivers/event/octeontx/ssovf_evdev.c index 6bff72ac79..54512217de 100644 --- a/drivers/event/octeontx/ssovf_evdev.c +++ b/drivers/event/octeontx/ssovf_evdev.c @@ -235,6 +235,7 @@ ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->new_event_threshold = edev->max_num_events; port_conf->dequeue_depth = 1; port_conf->enqueue_depth = 1; + port_conf->disable_implicit_release = 0; } static void diff --git a/drivers/event/skeleton/skeleton_eventdev.c b/drivers/event/skeleton/skeleton_eventdev.c index bd8c5e7725..7f4675686d 100644 --- a/drivers/event/skeleton/skeleton_eventdev.c +++ b/drivers/event/skeleton/skeleton_eventdev.c @@ -209,6 +209,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->new_event_threshold = 32 * 1024; port_conf->dequeue_depth = 16; port_conf->enqueue_depth = 16; + port_conf->disable_implicit_release = 0; } static void diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index 18963e4100..bcd1ce9d10 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -163,6 +163,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, } p->inflight_max = conf->new_event_threshold; + p->implicit_release = !conf->disable_implicit_release; /* check if ring exists, same as rx_worker above */ snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id, @@ -385,6 +386,7 @@ sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->new_event_threshold = 1024; port_conf->dequeue_depth = 16; port_conf->enqueue_depth = 16; + port_conf->disable_implicit_release = 0; } static int @@ -454,9 +456,11 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info) .max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH, .max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH, .max_num_events = SW_INFLIGHT_EVENTS_TOTAL, - .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS | - RTE_EVENT_DEV_CAP_BURST_MODE | - RTE_EVENT_DEV_CAP_EVENT_QOS), + .event_dev_cap = ( + RTE_EVENT_DEV_CAP_QUEUE_QOS | + RTE_EVENT_DEV_CAP_BURST_MODE | + RTE_EVENT_DEV_CAP_EVENT_QOS | + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE), }; *info = evdev_sw_info; diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index 34f91eaabe..436b8f3f28 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -185,6 +185,7 @@ struct sw_port { uint16_t outstanding_releases __rte_cache_aligned; uint16_t inflight_max; /* app requested max inflights for this port */ uint16_t inflight_credits; /* num credits this port has right now */ + uint8_t implicit_release; /* release events before dequeueing */ uint16_t last_dequeue_burst_sz; /* how big the burst was */ uint64_t last_dequeue_ticks; /* used to track burst processing time */ diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c index 9df4ac9e03..5854be5b2d 100644 --- a/drivers/event/sw/sw_evdev_worker.c +++ b/drivers/event/sw/sw_evdev_worker.c @@ -81,7 +81,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) return 0; } - uint32_t forwards = 0; + uint32_t completions = 0; for (i = 0; i < num; i++) { int op = ev[i].op; int outstanding = p->outstanding_releases > 0; @@ -90,7 +90,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) p->inflight_credits -= (op == RTE_EVENT_OP_NEW); p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * outstanding; - forwards += (op == RTE_EVENT_OP_FORWARD); new_ops[i] = sw_qe_flag_map[op]; new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); @@ -99,8 +98,10 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) * correct usage of the API), providing very high correct * prediction rate. */ - if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) + if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) { p->outstanding_releases--; + completions++; + } /* error case: branch to avoid touching p->stats */ if (unlikely(invalid_qid)) { @@ -109,8 +110,8 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) } } - /* handle directed port forward credits */ - p->inflight_credits -= forwards * p->is_directed; + /* handle directed port forward and release credits */ + p->inflight_credits -= completions * p->is_directed; /* returns number of events actually enqueued */ uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, @@ -144,7 +145,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, uint32_t credit_update_quanta = sw->credit_update_quanta; /* check that all previous dequeues have been released */ - if (!p->is_directed) { + if (p->implicit_release && !p->is_directed) { uint16_t out_rels = p->outstanding_releases; uint16_t i; for (i = 0; i < out_rels; i++) @@ -154,7 +155,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, /* returns number of events actually dequeued */ uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); if (unlikely(ndeq == 0)) { - p->outstanding_releases = 0; p->zero_polls++; p->total_polls++; goto end; @@ -162,7 +162,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, /* only add credits for directed ports - LB ports send RELEASEs */ p->inflight_credits += ndeq * p->is_directed; - p->outstanding_releases = ndeq; + p->outstanding_releases += ndeq; p->last_dequeue_burst_sz = ndeq; p->last_dequeue_ticks = rte_get_timer_cycles(); p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index 915df79c6c..fa832ccd19 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -34,6 +34,7 @@ struct prod_data { struct cons_data { uint8_t dev_id; uint8_t port_id; + uint8_t release; } __rte_cache_aligned; static struct prod_data prod_data; @@ -139,6 +140,18 @@ consumer(void) uint8_t outport = packets[i].mbuf->port; rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport], packets[i].mbuf); + + packets[i].op = RTE_EVENT_OP_RELEASE; + } + + if (cons_data.release) { + uint16_t nb_tx; + + nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n); + while (nb_tx < n) + nb_tx += rte_event_enqueue_burst(dev_id, port_id, + packets + nb_tx, + n - nb_tx); } /* Print out mpps every 1<22 packets */ @@ -685,6 +698,7 @@ setup_eventdev(struct prod_data *prod_data, }; struct port_link worker_queues[MAX_NUM_STAGES]; + uint8_t disable_implicit_release; struct port_link tx_queue; unsigned int i; @@ -698,6 +712,12 @@ setup_eventdev(struct prod_data *prod_data, ret = rte_event_dev_info_get(dev_id, &dev_info); printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); + disable_implicit_release = (dev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE); + + wkr_p_conf.disable_implicit_release = disable_implicit_release; + tx_p_conf.disable_implicit_release = disable_implicit_release; + if (dev_info.max_event_port_dequeue_depth < config.nb_event_port_dequeue_depth) config.nb_event_port_dequeue_depth = @@ -806,6 +826,7 @@ setup_eventdev(struct prod_data *prod_data, .dequeue_depth = 8, .enqueue_depth = 8, .new_event_threshold = 1200, + .disable_implicit_release = disable_implicit_release, }; if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) @@ -822,7 +843,8 @@ setup_eventdev(struct prod_data *prod_data, .port_id = i + 1, .qid = cdata.qid[0] }; *cons_data = (struct cons_data){.dev_id = dev_id, - .port_id = i }; + .port_id = i, + .release = disable_implicit_release }; ret = rte_event_dev_service_id_get(dev_id, &fdata->evdev_service_id); diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c index 5e2c4b1a0e..08a8fa6a70 100644 --- a/lib/librte_eventdev/rte_eventdev.c +++ b/lib/librte_eventdev/rte_eventdev.c @@ -658,6 +658,15 @@ rte_event_port_setup(uint8_t dev_id, uint8_t port_id, return -EINVAL; } + if (port_conf && port_conf->disable_implicit_release && + !(dev->data->event_dev_cap & + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) { + RTE_EDEV_LOG_ERR( + "dev%d port%d Implicit release disable not supported", + dev_id, port_id); + return -EINVAL; + } + if (dev->data->dev_started) { RTE_EDEV_LOG_ERR( "device %d must be stopped to allow port setup", dev_id); diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h index 9134b49848..84ac47d543 100644 --- a/lib/librte_eventdev/rte_eventdev.h +++ b/lib/librte_eventdev/rte_eventdev.h @@ -283,6 +283,16 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */ * * @see rte_event_dequeue_burst() rte_event_enqueue_burst() */ +#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE (1ULL << 5) +/**< Event device ports support disabling the implicit release feature, in + * which the port will release all unreleased events in its dequeue operation. + * If this capability is set and the port is configured with implicit release + * disabled, the application is responsible for explicitly releasing events + * using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event + * enqueue operations. + * + * @see rte_event_dequeue_burst() rte_event_enqueue_burst() + */ /* Event device priority levels */ #define RTE_EVENT_DEV_PRIORITY_HIGHEST 0 @@ -687,6 +697,13 @@ struct rte_event_port_conf { * which previously supplied to rte_event_dev_configure(). * Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable. */ + uint8_t disable_implicit_release; + /**< Configure the port not to release outstanding events in + * rte_event_dev_dequeue_burst(). If true, all events received through + * the port must be explicitly released with RTE_EVENT_OP_RELEASE or + * RTE_EVENT_OP_FORWARD. Must be false when the device is not + * RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable. + */ }; /** @@ -1382,9 +1399,9 @@ rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t ns, * * The number of events dequeued is the number of scheduler contexts held by * this port. These contexts are automatically released in the next - * rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst() - * with RTE_EVENT_OP_RELEASE operation can be used to release the - * contexts early. + * rte_event_dequeue_burst() invocation if the port supports implicit + * releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE + * operation can be used to release the contexts early. * * Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be * enqueued to the same port that their associated events were dequeued from. diff --git a/test/test/test_eventdev.c b/test/test/test_eventdev.c index 615aeb0f4d..71129de239 100644 --- a/test/test/test_eventdev.c +++ b/test/test/test_eventdev.c @@ -553,6 +553,15 @@ test_eventdev_port_setup(void) ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf); TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret); + if (!(info.event_dev_cap & + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) { + pconf.enqueue_depth = info.max_event_port_enqueue_depth; + pconf.disable_implicit_release = 1; + ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf); + TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret); + pconf.disable_implicit_release = 0; + } + ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports, &pconf); TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret); diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c index 7d121a7cf1..c89f7de8c1 100644 --- a/test/test/test_eventdev_sw.c +++ b/test/test/test_eventdev_sw.c @@ -171,6 +171,7 @@ create_ports(struct test *t, int num_ports) .new_event_threshold = 1024, .dequeue_depth = 32, .enqueue_depth = 64, + .disable_implicit_release = 0, }; if (num_ports > MAX_PORTS) return -1; @@ -1225,6 +1226,7 @@ port_reconfig_credits(struct test *t) .new_event_threshold = 128, .dequeue_depth = 32, .enqueue_depth = 64, + .disable_implicit_release = 0, }; if (rte_event_port_setup(evdev, 0, &port_conf) < 0) { printf("%d Error setting up port\n", __LINE__); @@ -1314,6 +1316,7 @@ port_single_lb_reconfig(struct test *t) .new_event_threshold = 128, .dequeue_depth = 32, .enqueue_depth = 64, + .disable_implicit_release = 0, }; if (rte_event_port_setup(evdev, 0, &port_conf) < 0) { printf("%d Error setting up port\n", __LINE__); @@ -2906,7 +2909,7 @@ worker_loopback_producer_fn(void *arg) } static int -worker_loopback(struct test *t) +worker_loopback(struct test *t, uint8_t disable_implicit_release) { /* use a single producer core, and a worker core to see what happens * if the worker loops packets back multiple times @@ -2932,6 +2935,7 @@ worker_loopback(struct test *t) * only be initialized once - and this needs to be set for multiple runs */ conf.new_event_threshold = 512; + conf.disable_implicit_release = disable_implicit_release; if (rte_event_port_setup(evdev, 0, &conf) < 0) { printf("Error setting up RX port\n"); @@ -3206,15 +3210,23 @@ test_sw_eventdev(void) } if (rte_lcore_count() >= 3) { printf("*** Running Worker loopback test...\n"); - ret = worker_loopback(t); + ret = worker_loopback(t, 0); + if (ret != 0) { + printf("ERROR - Worker loopback test FAILED.\n"); + return ret; + } + + printf("*** Running Worker loopback test (implicit release disabled)...\n"); + ret = worker_loopback(t, 1); if (ret != 0) { printf("ERROR - Worker loopback test FAILED.\n"); return ret; } } else { - printf("### Not enough cores for worker loopback test.\n"); - printf("### Need at least 3 cores for test.\n"); + printf("### Not enough cores for worker loopback tests.\n"); + printf("### Need at least 3 cores for the tests.\n"); } + /* * Free test instance, leaving mempool initialized, and a pointer to it * in static eventdev_func_mempool, as it is re-used on re-runs