app/eventdev: add Tx service setup
[dpdk.git] / app / test-eventdev / test_pipeline_common.c
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5
6 #include "test_pipeline_common.h"
7
8 static int32_t
9 pipeline_event_tx_burst_service_func(void *args)
10 {
11
12         int i;
13         struct tx_service_data *tx = args;
14         const uint8_t dev = tx->dev_id;
15         const uint8_t port = tx->port_id;
16         struct rte_event ev[BURST_SIZE + 1];
17
18         uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
19
20         if (!nb_rx) {
21                 for (i = 0; i < tx->nb_ethports; i++)
22                         rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]);
23                 return 0;
24         }
25
26         for (i = 0; i < nb_rx; i++) {
27                 struct rte_mbuf *m = ev[i].mbuf;
28                 rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m);
29         }
30         tx->processed_pkts += nb_rx;
31
32         return 0;
33 }
34
35 static int32_t
36 pipeline_event_tx_service_func(void *args)
37 {
38
39         int i;
40         struct tx_service_data *tx = args;
41         const uint8_t dev = tx->dev_id;
42         const uint8_t port = tx->port_id;
43         struct rte_event ev;
44
45         uint16_t nb_rx = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
46
47         if (!nb_rx) {
48                 for (i = 0; i < tx->nb_ethports; i++)
49                         rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]);
50                 return 0;
51         }
52
53         struct rte_mbuf *m = ev.mbuf;
54         rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m);
55         tx->processed_pkts++;
56
57         return 0;
58 }
59
60 int
61 pipeline_test_result(struct evt_test *test, struct evt_options *opt)
62 {
63         RTE_SET_USED(opt);
64         int i;
65         uint64_t total = 0;
66         struct test_pipeline *t = evt_test_priv(test);
67
68         printf("Packet distribution across worker cores :\n");
69         for (i = 0; i < t->nb_workers; i++)
70                 total += t->worker[i].processed_pkts;
71         for (i = 0; i < t->nb_workers; i++)
72                 printf("Worker %d packets: "CLGRN"%"PRIx64" "CLNRM"percentage:"
73                                 CLGRN" %3.2f\n"CLNRM, i,
74                                 t->worker[i].processed_pkts,
75                                 (((double)t->worker[i].processed_pkts)/total)
76                                 * 100);
77         return t->result;
78 }
79
80 void
81 pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues)
82 {
83         evt_dump("nb_worker_lcores", "%d", evt_nr_active_lcores(opt->wlcores));
84         evt_dump_worker_lcores(opt);
85         evt_dump_nb_stages(opt);
86         evt_dump("nb_evdev_ports", "%d", pipeline_nb_event_ports(opt));
87         evt_dump("nb_evdev_queues", "%d", nb_queues);
88         evt_dump_queue_priority(opt);
89         evt_dump_sched_type_list(opt);
90         evt_dump_producer_type(opt);
91 }
92
93 int
94 pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues)
95 {
96         unsigned int lcores;
97         /*
98          * N worker + 1 master
99          */
100         lcores = 2;
101
102         if (!rte_eth_dev_count()) {
103                 evt_err("test needs minimum 1 ethernet dev");
104                 return -1;
105         }
106
107         if (rte_lcore_count() < lcores) {
108                 evt_err("test need minimum %d lcores", lcores);
109                 return -1;
110         }
111
112         /* Validate worker lcores */
113         if (evt_lcores_has_overlap(opt->wlcores, rte_get_master_lcore())) {
114                 evt_err("worker lcores overlaps with master lcore");
115                 return -1;
116         }
117         if (evt_has_disabled_lcore(opt->wlcores)) {
118                 evt_err("one or more workers lcores are not enabled");
119                 return -1;
120         }
121         if (!evt_has_active_lcore(opt->wlcores)) {
122                 evt_err("minimum one worker is required");
123                 return -1;
124         }
125
126         if (nb_queues > EVT_MAX_QUEUES) {
127                 evt_err("number of queues exceeds %d", EVT_MAX_QUEUES);
128                 return -1;
129         }
130         if (pipeline_nb_event_ports(opt) > EVT_MAX_PORTS) {
131                 evt_err("number of ports exceeds %d", EVT_MAX_PORTS);
132                 return -1;
133         }
134
135         if (evt_has_invalid_stage(opt))
136                 return -1;
137
138         if (evt_has_invalid_sched_type(opt))
139                 return -1;
140
141         return 0;
142 }
143
144 #define NB_RX_DESC                      128
145 #define NB_TX_DESC                      512
146 int
147 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
148 {
149         int i;
150         uint8_t nb_queues = 1;
151         uint8_t mt_state = 0;
152         struct test_pipeline *t = evt_test_priv(test);
153         struct rte_eth_rxconf rx_conf;
154         struct rte_eth_conf port_conf = {
155                 .rxmode = {
156                         .mq_mode = ETH_MQ_RX_RSS,
157                         .max_rx_pkt_len = ETHER_MAX_LEN,
158                         .offloads = DEV_RX_OFFLOAD_CRC_STRIP,
159                         .ignore_offload_bitfield = 1,
160                 },
161                 .rx_adv_conf = {
162                         .rss_conf = {
163                                 .rss_key = NULL,
164                                 .rss_hf = ETH_RSS_IP,
165                         },
166                 },
167         };
168
169         RTE_SET_USED(opt);
170         if (!rte_eth_dev_count()) {
171                 evt_err("No ethernet ports found.\n");
172                 return -ENODEV;
173         }
174
175         for (i = 0; i < rte_eth_dev_count(); i++) {
176                 struct rte_eth_dev_info dev_info;
177
178                 memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
179                 rte_eth_dev_info_get(i, &dev_info);
180                 mt_state = !(dev_info.tx_offload_capa &
181                                 DEV_TX_OFFLOAD_MT_LOCKFREE);
182                 rx_conf = dev_info.default_rxconf;
183                 rx_conf.offloads = port_conf.rxmode.offloads;
184
185                 if (rte_eth_dev_configure(i, nb_queues, nb_queues,
186                                         &port_conf)
187                                 < 0) {
188                         evt_err("Failed to configure eth port [%d]\n", i);
189                         return -EINVAL;
190                 }
191
192                 if (rte_eth_rx_queue_setup(i, 0, NB_RX_DESC,
193                                 rte_socket_id(), &rx_conf, t->pool) < 0) {
194                         evt_err("Failed to setup eth port [%d] rx_queue: %d.\n",
195                                         i, 0);
196                         return -EINVAL;
197                 }
198                 if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC,
199                                         rte_socket_id(), NULL) < 0) {
200                         evt_err("Failed to setup eth port [%d] tx_queue: %d.\n",
201                                         i, 0);
202                         return -EINVAL;
203                 }
204
205                 t->mt_unsafe |= mt_state;
206                 t->tx_service.tx_buf[i] =
207                         rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0);
208                 if (t->tx_service.tx_buf[i] == NULL)
209                         rte_panic("Unable to allocate Tx buffer memory.");
210                 rte_eth_promiscuous_enable(i);
211         }
212
213         return 0;
214 }
215
216 int
217 pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
218                 uint8_t *queue_arr, uint8_t nb_queues,
219                 const struct rte_event_port_conf p_conf)
220 {
221         int i;
222         int ret;
223         uint8_t port;
224         struct test_pipeline *t = evt_test_priv(test);
225
226
227         /* setup one port per worker, linking to all queues */
228         for (port = 0; port < evt_nr_active_lcores(opt->wlcores); port++) {
229                 struct worker_data *w = &t->worker[port];
230
231                 w->dev_id = opt->dev_id;
232                 w->port_id = port;
233                 w->t = t;
234                 w->processed_pkts = 0;
235
236                 ret = rte_event_port_setup(opt->dev_id, port, &p_conf);
237                 if (ret) {
238                         evt_err("failed to setup port %d", port);
239                         return ret;
240                 }
241
242                 if (queue_arr == NULL) {
243                         if (rte_event_port_link(opt->dev_id, port, NULL, NULL,
244                                                 0) != nb_queues)
245                                 goto link_fail;
246                 } else {
247                         for (i = 0; i < nb_queues; i++) {
248                                 if (rte_event_port_link(opt->dev_id, port,
249                                                 &queue_arr[i], NULL, 1) != 1)
250                                         goto link_fail;
251                         }
252                 }
253         }
254
255         return 0;
256
257 link_fail:
258         evt_err("failed to link all queues to port %d", port);
259         return -EINVAL;
260 }
261
262 int
263 pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride,
264                 struct rte_event_port_conf prod_conf)
265 {
266         int ret = 0;
267         uint16_t prod;
268         struct rte_event_eth_rx_adapter_queue_conf queue_conf;
269
270         memset(&queue_conf, 0,
271                         sizeof(struct rte_event_eth_rx_adapter_queue_conf));
272         queue_conf.ev.sched_type = opt->sched_type_list[0];
273         for (prod = 0; prod < rte_eth_dev_count(); prod++) {
274                 uint32_t cap;
275
276                 ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id,
277                                 prod, &cap);
278                 if (ret) {
279                         evt_err("failed to get event rx adapter[%d]"
280                                         " capabilities",
281                                         opt->dev_id);
282                         return ret;
283                 }
284                 queue_conf.ev.queue_id = prod * stride;
285                 ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id,
286                                 &prod_conf);
287                 if (ret) {
288                         evt_err("failed to create rx adapter[%d]", prod);
289                         return ret;
290                 }
291                 ret = rte_event_eth_rx_adapter_queue_add(prod, prod, -1,
292                                 &queue_conf);
293                 if (ret) {
294                         evt_err("failed to add rx queues to adapter[%d]", prod);
295                         return ret;
296                 }
297
298                 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
299                         uint32_t service_id;
300
301                         rte_event_eth_rx_adapter_service_id_get(prod,
302                                         &service_id);
303                         ret = evt_service_setup(service_id);
304                         if (ret) {
305                                 evt_err("Failed to setup service core"
306                                                 " for Rx adapter\n");
307                                 return ret;
308                         }
309                 }
310
311                 ret = rte_eth_dev_start(prod);
312                 if (ret) {
313                         evt_err("Ethernet dev [%d] failed to start."
314                                         " Using synthetic producer", prod);
315                         return ret;
316                 }
317
318                 ret = rte_event_eth_rx_adapter_start(prod);
319                 if (ret) {
320                         evt_err("Rx adapter[%d] start failed", prod);
321                         return ret;
322                 }
323                 printf("%s: Port[%d] using Rx adapter[%d] started\n", __func__,
324                                 prod, prod);
325         }
326
327         return ret;
328 }
329
330 int
331 pipeline_event_tx_service_setup(struct evt_test *test, struct evt_options *opt,
332                 uint8_t tx_queue_id, uint8_t tx_port_id,
333                 const struct rte_event_port_conf p_conf)
334 {
335         int ret;
336         struct rte_service_spec serv;
337         struct test_pipeline *t = evt_test_priv(test);
338         struct tx_service_data *tx = &t->tx_service;
339
340         ret = rte_event_port_setup(opt->dev_id, tx_port_id, &p_conf);
341         if (ret) {
342                 evt_err("failed to setup port %d", tx_port_id);
343                 return ret;
344         }
345
346         if (rte_event_port_link(opt->dev_id, tx_port_id, &tx_queue_id,
347                                 NULL, 1) != 1) {
348                 evt_err("failed to link queues to port %d", tx_port_id);
349                 return -EINVAL;
350         }
351
352         tx->dev_id = opt->dev_id;
353         tx->queue_id = tx_queue_id;
354         tx->port_id = tx_port_id;
355         tx->nb_ethports = rte_eth_dev_count();
356         tx->t = t;
357
358         /* Register Tx service */
359         memset(&serv, 0, sizeof(struct rte_service_spec));
360         snprintf(serv.name, sizeof(serv.name), "Tx_service");
361
362         if (evt_has_burst_mode(opt->dev_id))
363                 serv.callback = pipeline_event_tx_burst_service_func;
364         else
365                 serv.callback = pipeline_event_tx_service_func;
366
367         serv.callback_userdata = (void *)tx;
368         ret = rte_service_component_register(&serv, &tx->service_id);
369         if (ret) {
370                 evt_err("failed to register Tx service");
371                 return ret;
372         }
373
374         ret = evt_service_setup(tx->service_id);
375         if (ret) {
376                 evt_err("Failed to setup service core for Tx service\n");
377                 return ret;
378         }
379
380         rte_service_runstate_set(tx->service_id, 1);
381
382         return 0;
383 }
384
385
386 void
387 pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt)
388 {
389         int i;
390         RTE_SET_USED(test);
391         RTE_SET_USED(opt);
392         struct test_pipeline *t = evt_test_priv(test);
393
394         if (t->mt_unsafe) {
395                 rte_service_component_runstate_set(t->tx_service.service_id, 0);
396                 rte_service_runstate_set(t->tx_service.service_id, 0);
397                 rte_service_component_unregister(t->tx_service.service_id);
398         }
399
400         for (i = 0; i < rte_eth_dev_count(); i++) {
401                 rte_event_eth_rx_adapter_stop(i);
402                 rte_eth_dev_stop(i);
403                 rte_eth_dev_close(i);
404         }
405 }
406
407 void
408 pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
409 {
410         RTE_SET_USED(test);
411
412         rte_event_dev_stop(opt->dev_id);
413         rte_event_dev_close(opt->dev_id);
414 }
415
416 int
417 pipeline_mempool_setup(struct evt_test *test, struct evt_options *opt)
418 {
419         struct test_pipeline *t = evt_test_priv(test);
420
421         t->pool = rte_pktmbuf_pool_create(test->name, /* mempool name */
422                         opt->pool_sz, /* number of elements*/
423                         512, /* cache size*/
424                         0,
425                         RTE_MBUF_DEFAULT_BUF_SIZE,
426                         opt->socket_id); /* flags */
427
428         if (t->pool == NULL) {
429                 evt_err("failed to create mempool");
430                 return -ENOMEM;
431         }
432
433         return 0;
434 }
435
436 void
437 pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt)
438 {
439         RTE_SET_USED(opt);
440         struct test_pipeline *t = evt_test_priv(test);
441
442         rte_mempool_free(t->pool);
443 }
444
445 int
446 pipeline_test_setup(struct evt_test *test, struct evt_options *opt)
447 {
448         void *test_pipeline;
449
450         test_pipeline = rte_zmalloc_socket(test->name,
451                         sizeof(struct test_pipeline), RTE_CACHE_LINE_SIZE,
452                         opt->socket_id);
453         if (test_pipeline  == NULL) {
454                 evt_err("failed to allocate test_pipeline memory");
455                 goto nomem;
456         }
457         test->test_priv = test_pipeline;
458
459         struct test_pipeline *t = evt_test_priv(test);
460
461         t->nb_workers = evt_nr_active_lcores(opt->wlcores);
462         t->outstand_pkts = opt->nb_pkts * evt_nr_active_lcores(opt->wlcores);
463         t->done = false;
464         t->nb_flows = opt->nb_flows;
465         t->result = EVT_TEST_FAILED;
466         t->opt = opt;
467         opt->prod_type = EVT_PROD_TYPE_ETH_RX_ADPTR;
468         memcpy(t->sched_type_list, opt->sched_type_list,
469                         sizeof(opt->sched_type_list));
470         return 0;
471 nomem:
472         return -ENOMEM;
473 }
474
475 void
476 pipeline_test_destroy(struct evt_test *test, struct evt_options *opt)
477 {
478         RTE_SET_USED(opt);
479
480         rte_free(test->test_priv);
481 }