vdpa/mlx5: add task ring for multi-thread management
[dpdk.git] / lib / eventdev / rte_event_eth_tx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <ethdev_driver.h>
7
8 #include "eventdev_pmd.h"
9 #include "eventdev_trace.h"
10 #include "rte_event_eth_tx_adapter.h"
11
12 #define TXA_BATCH_SIZE          32
13 #define TXA_SERVICE_NAME_LEN    32
14 #define TXA_MEM_NAME_LEN        32
15 #define TXA_FLUSH_THRESHOLD     1024
16 #define TXA_RETRY_CNT           100
17 #define TXA_MAX_NB_TX           128
18 #define TXA_INVALID_DEV_ID      INT32_C(-1)
19 #define TXA_INVALID_SERVICE_ID  INT64_C(-1)
20
21 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
22
23 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
24
25 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
26
27 #define txa_dev_adapter_create_ext(t) \
28                                 txa_evdev(t)->dev_ops->eth_tx_adapter_create
29
30 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
31
32 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
33
34 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
35
36 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
37
38 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
39
40 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
41
42 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
43
44 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
45 do { \
46         if (!txa_valid_id(id)) { \
47                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
48                 return retval; \
49         } \
50 } while (0)
51
52 #define TXA_CHECK_OR_ERR_RET(id) \
53 do {\
54         int ret; \
55         RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
56         ret = txa_init(); \
57         if (ret != 0) \
58                 return ret; \
59         if (!txa_adapter_exist((id))) \
60                 return -EINVAL; \
61 } while (0)
62
63 #define TXA_CHECK_TXQ(dev, queue) \
64 do {\
65         if ((dev)->data->nb_tx_queues == 0) { \
66                 RTE_EDEV_LOG_ERR("No tx queues configured"); \
67                 return -EINVAL; \
68         } \
69         if ((queue) != -1 && \
70                 (uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
71                 RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
72                                 (uint16_t)(queue)); \
73                 return -EINVAL; \
74         } \
75 } while (0)
76
77 /* Tx retry callback structure */
78 struct txa_retry {
79         /* Ethernet port id */
80         uint16_t port_id;
81         /* Tx queue */
82         uint16_t tx_queue;
83         /* Adapter ID */
84         uint8_t id;
85 };
86
87 /* Per queue structure */
88 struct txa_service_queue_info {
89         /* Queue has been added */
90         uint8_t added;
91         /* Retry callback argument */
92         struct txa_retry txa_retry;
93         /* Tx buffer */
94         struct rte_eth_dev_tx_buffer *tx_buf;
95 };
96
97 /* PMD private structure */
98 struct txa_service_data {
99         /* Max mbufs processed in any service function invocation */
100         uint32_t max_nb_tx;
101         /* Number of Tx queues in adapter */
102         uint32_t nb_queues;
103         /*  Synchronization with data path */
104         rte_spinlock_t tx_lock;
105         /* Event port ID */
106         uint8_t port_id;
107         /* Event device identifier */
108         uint8_t eventdev_id;
109         /* Highest port id supported + 1 */
110         uint16_t dev_count;
111         /* Loop count to flush Tx buffers */
112         int loop_cnt;
113         /* Per ethernet device structure */
114         struct txa_service_ethdev *txa_ethdev;
115         /* Statistics */
116         struct rte_event_eth_tx_adapter_stats stats;
117         /* Adapter Identifier */
118         uint8_t id;
119         /* Conf arg must be freed */
120         uint8_t conf_free;
121         /* Configuration callback */
122         rte_event_eth_tx_adapter_conf_cb conf_cb;
123         /* Configuration callback argument */
124         void *conf_arg;
125         /* socket id */
126         int socket_id;
127         /* Per adapter EAL service */
128         int64_t service_id;
129         /* Memory allocation name */
130         char mem_name[TXA_MEM_NAME_LEN];
131 } __rte_cache_aligned;
132
133 /* Per eth device structure */
134 struct txa_service_ethdev {
135         /* Pointer to ethernet device */
136         struct rte_eth_dev *dev;
137         /* Number of queues added */
138         uint16_t nb_queues;
139         /* PMD specific queue data */
140         void *queues;
141 };
142
143 /* Array of adapter instances, initialized with event device id
144  * when adapter is created
145  */
146 static int *txa_dev_id_array;
147
148 /* Array of pointers to service implementation data */
149 static struct txa_service_data **txa_service_data_array;
150
151 static int32_t txa_service_func(void *args);
152 static int txa_service_adapter_create_ext(uint8_t id,
153                         struct rte_eventdev *dev,
154                         rte_event_eth_tx_adapter_conf_cb conf_cb,
155                         void *conf_arg);
156 static int txa_service_queue_del(uint8_t id,
157                                 const struct rte_eth_dev *dev,
158                                 int32_t tx_queue_id);
159
160 static int
161 txa_adapter_exist(uint8_t id)
162 {
163         return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
164 }
165
166 static inline int
167 txa_valid_id(uint8_t id)
168 {
169         return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
170 }
171
172 static void *
173 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
174 {
175         const struct rte_memzone *mz;
176         unsigned int sz;
177
178         sz = elt_size * nb_elems;
179         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
180
181         mz = rte_memzone_lookup(name);
182         if (mz == NULL) {
183                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
184                                                  RTE_CACHE_LINE_SIZE);
185                 if (mz == NULL) {
186                         RTE_EDEV_LOG_ERR("failed to reserve memzone"
187                                         " name = %s err = %"
188                                         PRId32, name, rte_errno);
189                         return NULL;
190                 }
191         }
192
193         return  mz->addr;
194 }
195
196 static int
197 txa_dev_id_array_init(void)
198 {
199         if (txa_dev_id_array == NULL) {
200                 int i;
201
202                 txa_dev_id_array = txa_memzone_array_get("txa_adapter_array",
203                                         sizeof(int),
204                                         RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
205                 if (txa_dev_id_array == NULL)
206                         return -ENOMEM;
207
208                 for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
209                         txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
210         }
211
212         return 0;
213 }
214
215 static int
216 txa_init(void)
217 {
218         return txa_dev_id_array_init();
219 }
220
221 static int
222 txa_service_data_init(void)
223 {
224         if (txa_service_data_array == NULL) {
225                 txa_service_data_array =
226                                 txa_memzone_array_get("txa_service_data_array",
227                                         sizeof(*txa_service_data_array),
228                                         RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
229                 if (txa_service_data_array == NULL)
230                         return -ENOMEM;
231         }
232
233         return 0;
234 }
235
236 static inline struct txa_service_data *
237 txa_service_id_to_data(uint8_t id)
238 {
239         return txa_service_data_array[id];
240 }
241
242 static inline struct txa_service_queue_info *
243 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
244                 uint16_t tx_queue_id)
245 {
246         struct txa_service_queue_info *tqi;
247
248         if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
249                 return NULL;
250
251         tqi = txa->txa_ethdev[port_id].queues;
252
253         return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
254 }
255
256 static int
257 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
258                 struct rte_event_eth_tx_adapter_conf *conf, void *arg)
259 {
260         int ret;
261         struct rte_eventdev *dev;
262         struct rte_event_port_conf *pc;
263         struct rte_event_dev_config dev_conf;
264         int started;
265         uint8_t port_id;
266
267         pc = arg;
268         dev = &rte_eventdevs[dev_id];
269         dev_conf = dev->data->dev_conf;
270
271         started = dev->data->dev_started;
272         if (started)
273                 rte_event_dev_stop(dev_id);
274
275         port_id = dev_conf.nb_event_ports;
276         dev_conf.nb_event_ports += 1;
277
278         ret = rte_event_dev_configure(dev_id, &dev_conf);
279         if (ret) {
280                 RTE_EDEV_LOG_ERR("failed to configure event dev %u",
281                                                 dev_id);
282                 if (started) {
283                         if (rte_event_dev_start(dev_id))
284                                 return -EIO;
285                 }
286                 return ret;
287         }
288
289         ret = rte_event_port_setup(dev_id, port_id, pc);
290         if (ret) {
291                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
292                                         port_id);
293                 if (started) {
294                         if (rte_event_dev_start(dev_id))
295                                 return -EIO;
296                 }
297                 return ret;
298         }
299
300         conf->event_port_id = port_id;
301         conf->max_nb_tx = TXA_MAX_NB_TX;
302         if (started)
303                 ret = rte_event_dev_start(dev_id);
304         return ret;
305 }
306
307 static int
308 txa_service_ethdev_alloc(struct txa_service_data *txa)
309 {
310         struct txa_service_ethdev *txa_ethdev;
311         uint16_t i, dev_count;
312
313         dev_count = rte_eth_dev_count_avail();
314         if (txa->txa_ethdev && dev_count == txa->dev_count)
315                 return 0;
316
317         txa_ethdev = rte_zmalloc_socket(txa->mem_name,
318                                         dev_count * sizeof(*txa_ethdev),
319                                         0,
320                                         txa->socket_id);
321         if (txa_ethdev == NULL) {
322                 RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
323                 return -ENOMEM;
324         }
325
326         if (txa->dev_count)
327                 memcpy(txa_ethdev, txa->txa_ethdev,
328                         txa->dev_count * sizeof(*txa_ethdev));
329
330         RTE_ETH_FOREACH_DEV(i) {
331                 if (i == dev_count)
332                         break;
333                 txa_ethdev[i].dev = &rte_eth_devices[i];
334         }
335
336         txa->txa_ethdev = txa_ethdev;
337         txa->dev_count = dev_count;
338         return 0;
339 }
340
341 static int
342 txa_service_queue_array_alloc(struct txa_service_data *txa,
343                         uint16_t port_id)
344 {
345         struct txa_service_queue_info *tqi;
346         uint16_t nb_queue;
347         int ret;
348
349         ret = txa_service_ethdev_alloc(txa);
350         if (ret != 0)
351                 return ret;
352
353         if (txa->txa_ethdev[port_id].queues)
354                 return 0;
355
356         nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
357         tqi = rte_zmalloc_socket(txa->mem_name,
358                                 nb_queue *
359                                 sizeof(struct txa_service_queue_info), 0,
360                                 txa->socket_id);
361         if (tqi == NULL)
362                 return -ENOMEM;
363         txa->txa_ethdev[port_id].queues = tqi;
364         return 0;
365 }
366
367 static void
368 txa_service_queue_array_free(struct txa_service_data *txa,
369                         uint16_t port_id)
370 {
371         struct txa_service_ethdev *txa_ethdev;
372         struct txa_service_queue_info *tqi;
373
374         txa_ethdev = &txa->txa_ethdev[port_id];
375         if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
376                 return;
377
378         tqi = txa_ethdev->queues;
379         txa_ethdev->queues = NULL;
380         rte_free(tqi);
381
382         if (txa->nb_queues == 0) {
383                 rte_free(txa->txa_ethdev);
384                 txa->txa_ethdev = NULL;
385         }
386 }
387
388 static void
389 txa_service_unregister(struct txa_service_data *txa)
390 {
391         if (txa->service_id != TXA_INVALID_SERVICE_ID) {
392                 rte_service_component_runstate_set(txa->service_id, 0);
393                 while (rte_service_may_be_active(txa->service_id))
394                         rte_pause();
395                 rte_service_component_unregister(txa->service_id);
396         }
397         txa->service_id = TXA_INVALID_SERVICE_ID;
398 }
399
400 static int
401 txa_service_register(struct txa_service_data *txa)
402 {
403         int ret;
404         struct rte_service_spec service;
405         struct rte_event_eth_tx_adapter_conf conf;
406
407         if (txa->service_id != TXA_INVALID_SERVICE_ID)
408                 return 0;
409
410         memset(&service, 0, sizeof(service));
411         snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
412         service.socket_id = txa->socket_id;
413         service.callback = txa_service_func;
414         service.callback_userdata = txa;
415         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
416         ret = rte_service_component_register(&service,
417                                         (uint32_t *)&txa->service_id);
418         if (ret) {
419                 RTE_EDEV_LOG_ERR("failed to register service %s err = %"
420                                  PRId32, service.name, ret);
421                 return ret;
422         }
423
424         ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
425         if (ret) {
426                 txa_service_unregister(txa);
427                 return ret;
428         }
429
430         rte_service_component_runstate_set(txa->service_id, 1);
431         txa->port_id = conf.event_port_id;
432         txa->max_nb_tx = conf.max_nb_tx;
433         return 0;
434 }
435
436 static struct rte_eth_dev_tx_buffer *
437 txa_service_tx_buf_alloc(struct txa_service_data *txa,
438                         const struct rte_eth_dev *dev)
439 {
440         struct rte_eth_dev_tx_buffer *tb;
441         uint16_t port_id;
442
443         port_id = dev->data->port_id;
444         tb = rte_zmalloc_socket(txa->mem_name,
445                                 RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
446                                 0,
447                                 rte_eth_dev_socket_id(port_id));
448         if (tb == NULL)
449                 RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
450         return tb;
451 }
452
453 static int
454 txa_service_is_queue_added(struct txa_service_data *txa,
455                         const struct rte_eth_dev *dev,
456                         uint16_t tx_queue_id)
457 {
458         struct txa_service_queue_info *tqi;
459
460         tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
461         return tqi && tqi->added;
462 }
463
464 static int
465 txa_service_ctrl(uint8_t id, int start)
466 {
467         int ret;
468         struct txa_service_data *txa;
469
470         txa = txa_service_id_to_data(id);
471         if (txa->service_id == TXA_INVALID_SERVICE_ID)
472                 return 0;
473
474         ret = rte_service_runstate_set(txa->service_id, start);
475         if (ret == 0 && !start) {
476                 while (rte_service_may_be_active(txa->service_id))
477                         rte_pause();
478         }
479         return ret;
480 }
481
482 static void
483 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
484                         void *userdata)
485 {
486         struct txa_retry *tr;
487         struct txa_service_data *data;
488         struct rte_event_eth_tx_adapter_stats *stats;
489         uint16_t sent = 0;
490         unsigned int retry = 0;
491         uint16_t i, n;
492
493         tr = (struct txa_retry *)(uintptr_t)userdata;
494         data = txa_service_id_to_data(tr->id);
495         stats = &data->stats;
496
497         do {
498                 n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
499                                &pkts[sent], unsent - sent);
500
501                 sent += n;
502         } while (sent != unsent && retry++ < TXA_RETRY_CNT);
503
504         for (i = sent; i < unsent; i++)
505                 rte_pktmbuf_free(pkts[i]);
506
507         stats->tx_retry += retry;
508         stats->tx_packets += sent;
509         stats->tx_dropped += unsent - sent;
510 }
511
512 static uint16_t
513 txa_process_event_vector(struct txa_service_data *txa,
514                          struct rte_event_vector *vec)
515 {
516         struct txa_service_queue_info *tqi;
517         uint16_t port, queue, nb_tx = 0;
518         struct rte_mbuf **mbufs;
519         int i;
520
521         mbufs = (struct rte_mbuf **)vec->mbufs;
522         if (vec->attr_valid) {
523                 port = vec->port;
524                 queue = vec->queue;
525                 tqi = txa_service_queue(txa, port, queue);
526                 if (unlikely(tqi == NULL || !tqi->added)) {
527                         rte_pktmbuf_free_bulk(mbufs, vec->nb_elem);
528                         rte_mempool_put(rte_mempool_from_obj(vec), vec);
529                         return 0;
530                 }
531                 for (i = 0; i < vec->nb_elem; i++) {
532                         nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
533                                                    mbufs[i]);
534                 }
535         } else {
536                 for (i = 0; i < vec->nb_elem; i++) {
537                         port = mbufs[i]->port;
538                         queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
539                         tqi = txa_service_queue(txa, port, queue);
540                         if (unlikely(tqi == NULL || !tqi->added)) {
541                                 rte_pktmbuf_free(mbufs[i]);
542                                 continue;
543                         }
544                         nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
545                                                    mbufs[i]);
546                 }
547         }
548         rte_mempool_put(rte_mempool_from_obj(vec), vec);
549
550         return nb_tx;
551 }
552
553 static void
554 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
555         uint32_t n)
556 {
557         uint32_t i;
558         uint16_t nb_tx;
559         struct rte_event_eth_tx_adapter_stats *stats;
560
561         stats = &txa->stats;
562
563         nb_tx = 0;
564         for (i = 0; i < n; i++) {
565                 uint16_t port;
566                 uint16_t queue;
567                 struct txa_service_queue_info *tqi;
568
569                 if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
570                         struct rte_mbuf *m;
571
572                         m = ev[i].mbuf;
573                         port = m->port;
574                         queue = rte_event_eth_tx_adapter_txq_get(m);
575
576                         tqi = txa_service_queue(txa, port, queue);
577                         if (unlikely(tqi == NULL || !tqi->added)) {
578                                 rte_pktmbuf_free(m);
579                                 continue;
580                         }
581
582                         nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
583                 } else {
584                         nb_tx += txa_process_event_vector(txa, ev[i].vec);
585                 }
586         }
587
588         stats->tx_packets += nb_tx;
589 }
590
591 static int32_t
592 txa_service_func(void *args)
593 {
594         struct txa_service_data *txa = args;
595         uint8_t dev_id;
596         uint8_t port;
597         uint16_t n;
598         uint32_t nb_tx, max_nb_tx;
599         struct rte_event ev[TXA_BATCH_SIZE];
600
601         dev_id = txa->eventdev_id;
602         max_nb_tx = txa->max_nb_tx;
603         port = txa->port_id;
604
605         if (txa->nb_queues == 0)
606                 return 0;
607
608         if (!rte_spinlock_trylock(&txa->tx_lock))
609                 return 0;
610
611         for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
612
613                 n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
614                 if (!n)
615                         break;
616                 txa_service_tx(txa, ev, n);
617         }
618
619         if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
620
621                 struct txa_service_ethdev *tdi;
622                 struct txa_service_queue_info *tqi;
623                 struct rte_eth_dev *dev;
624                 uint16_t i;
625
626                 tdi = txa->txa_ethdev;
627                 nb_tx = 0;
628
629                 RTE_ETH_FOREACH_DEV(i) {
630                         uint16_t q;
631
632                         if (i == txa->dev_count)
633                                 break;
634
635                         dev = tdi[i].dev;
636                         if (tdi[i].nb_queues == 0)
637                                 continue;
638                         for (q = 0; q < dev->data->nb_tx_queues; q++) {
639
640                                 tqi = txa_service_queue(txa, i, q);
641                                 if (unlikely(tqi == NULL || !tqi->added))
642                                         continue;
643
644                                 nb_tx += rte_eth_tx_buffer_flush(i, q,
645                                                         tqi->tx_buf);
646                         }
647                 }
648
649                 txa->stats.tx_packets += nb_tx;
650         }
651         rte_spinlock_unlock(&txa->tx_lock);
652         return 0;
653 }
654
655 static int
656 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
657                         struct rte_event_port_conf *port_conf)
658 {
659         struct txa_service_data *txa;
660         struct rte_event_port_conf *cb_conf;
661         int ret;
662
663         cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
664         if (cb_conf == NULL)
665                 return -ENOMEM;
666
667         *cb_conf = *port_conf;
668         ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
669                                         cb_conf);
670         if (ret) {
671                 rte_free(cb_conf);
672                 return ret;
673         }
674
675         txa = txa_service_id_to_data(id);
676         txa->conf_free = 1;
677         return ret;
678 }
679
680 static int
681 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
682                         rte_event_eth_tx_adapter_conf_cb conf_cb,
683                         void *conf_arg)
684 {
685         struct txa_service_data *txa;
686         int socket_id;
687         char mem_name[TXA_SERVICE_NAME_LEN];
688         int ret;
689
690         if (conf_cb == NULL)
691                 return -EINVAL;
692
693         socket_id = dev->data->socket_id;
694         snprintf(mem_name, TXA_MEM_NAME_LEN,
695                 "rte_event_eth_txa_%d",
696                 id);
697
698         ret = txa_service_data_init();
699         if (ret != 0)
700                 return ret;
701
702         txa = rte_zmalloc_socket(mem_name,
703                                 sizeof(*txa),
704                                 RTE_CACHE_LINE_SIZE, socket_id);
705         if (txa == NULL) {
706                 RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
707                 return -ENOMEM;
708         }
709
710         txa->id = id;
711         txa->eventdev_id = dev->data->dev_id;
712         txa->socket_id = socket_id;
713         strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
714         txa->conf_cb = conf_cb;
715         txa->conf_arg = conf_arg;
716         txa->service_id = TXA_INVALID_SERVICE_ID;
717         rte_spinlock_init(&txa->tx_lock);
718         txa_service_data_array[id] = txa;
719
720         return 0;
721 }
722
723 static int
724 txa_service_event_port_get(uint8_t id, uint8_t *port)
725 {
726         struct txa_service_data *txa;
727
728         txa = txa_service_id_to_data(id);
729         if (txa->service_id == TXA_INVALID_SERVICE_ID)
730                 return -ENODEV;
731
732         *port = txa->port_id;
733         return 0;
734 }
735
736 static int
737 txa_service_adapter_free(uint8_t id)
738 {
739         struct txa_service_data *txa;
740
741         txa = txa_service_id_to_data(id);
742         if (txa->nb_queues) {
743                 RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
744                                 txa->nb_queues);
745                 return -EBUSY;
746         }
747
748         if (txa->conf_free)
749                 rte_free(txa->conf_arg);
750         rte_free(txa);
751         return 0;
752 }
753
754 static int
755 txa_service_queue_add(uint8_t id,
756                 __rte_unused struct rte_eventdev *dev,
757                 const struct rte_eth_dev *eth_dev,
758                 int32_t tx_queue_id)
759 {
760         struct txa_service_data *txa;
761         struct txa_service_ethdev *tdi;
762         struct txa_service_queue_info *tqi;
763         struct rte_eth_dev_tx_buffer *tb;
764         struct txa_retry *txa_retry;
765         int ret = 0;
766
767         txa = txa_service_id_to_data(id);
768
769         if (tx_queue_id == -1) {
770                 int nb_queues;
771                 uint16_t i, j;
772                 uint16_t *qdone;
773
774                 nb_queues = eth_dev->data->nb_tx_queues;
775                 if (txa->dev_count > eth_dev->data->port_id) {
776                         tdi = &txa->txa_ethdev[eth_dev->data->port_id];
777                         nb_queues -= tdi->nb_queues;
778                 }
779
780                 qdone = rte_zmalloc(txa->mem_name,
781                                 nb_queues * sizeof(*qdone), 0);
782                 if (qdone == NULL)
783                         return -ENOMEM;
784                 j = 0;
785                 for (i = 0; i < nb_queues; i++) {
786                         if (txa_service_is_queue_added(txa, eth_dev, i))
787                                 continue;
788                         ret = txa_service_queue_add(id, dev, eth_dev, i);
789                         if (ret == 0)
790                                 qdone[j++] = i;
791                         else
792                                 break;
793                 }
794
795                 if (i != nb_queues) {
796                         for (i = 0; i < j; i++)
797                                 txa_service_queue_del(id, eth_dev, qdone[i]);
798                 }
799                 rte_free(qdone);
800                 return ret;
801         }
802
803         ret = txa_service_register(txa);
804         if (ret)
805                 return ret;
806
807         rte_spinlock_lock(&txa->tx_lock);
808
809         if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
810                 goto ret_unlock;
811
812         ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
813         if (ret)
814                 goto err_unlock;
815
816         tb = txa_service_tx_buf_alloc(txa, eth_dev);
817         if (tb == NULL)
818                 goto err_unlock;
819
820         tdi = &txa->txa_ethdev[eth_dev->data->port_id];
821         tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
822         if (tqi == NULL)
823                 goto err_unlock;
824
825         txa_retry = &tqi->txa_retry;
826         txa_retry->id = txa->id;
827         txa_retry->port_id = eth_dev->data->port_id;
828         txa_retry->tx_queue = tx_queue_id;
829
830         rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
831         rte_eth_tx_buffer_set_err_callback(tb,
832                 txa_service_buffer_retry, txa_retry);
833
834         tqi->tx_buf = tb;
835         tqi->added = 1;
836         tdi->nb_queues++;
837         txa->nb_queues++;
838
839 ret_unlock:
840         rte_spinlock_unlock(&txa->tx_lock);
841         return 0;
842
843 err_unlock:
844         if (txa->nb_queues == 0) {
845                 txa_service_queue_array_free(txa,
846                                         eth_dev->data->port_id);
847                 txa_service_unregister(txa);
848         }
849
850         rte_spinlock_unlock(&txa->tx_lock);
851         return -1;
852 }
853
854 static int
855 txa_service_queue_del(uint8_t id,
856                 const struct rte_eth_dev *dev,
857                 int32_t tx_queue_id)
858 {
859         struct txa_service_data *txa;
860         struct txa_service_queue_info *tqi;
861         struct rte_eth_dev_tx_buffer *tb;
862         uint16_t port_id;
863
864         txa = txa_service_id_to_data(id);
865         port_id = dev->data->port_id;
866
867         if (tx_queue_id == -1) {
868                 uint16_t i, q, nb_queues;
869                 int ret = 0;
870
871                 nb_queues = txa->txa_ethdev[port_id].nb_queues;
872                 if (nb_queues == 0)
873                         return 0;
874
875                 i = 0;
876                 q = 0;
877                 tqi = txa->txa_ethdev[port_id].queues;
878
879                 while (i < nb_queues) {
880
881                         if (tqi[q].added) {
882                                 ret = txa_service_queue_del(id, dev, q);
883                                 if (ret != 0)
884                                         break;
885                         }
886                         i++;
887                         q++;
888                 }
889                 return ret;
890         }
891
892         txa = txa_service_id_to_data(id);
893
894         tqi = txa_service_queue(txa, port_id, tx_queue_id);
895         if (tqi == NULL || !tqi->added)
896                 return 0;
897
898         tb = tqi->tx_buf;
899         tqi->added = 0;
900         tqi->tx_buf = NULL;
901         rte_free(tb);
902         txa->nb_queues--;
903         txa->txa_ethdev[port_id].nb_queues--;
904
905         txa_service_queue_array_free(txa, port_id);
906         return 0;
907 }
908
909 static int
910 txa_service_id_get(uint8_t id, uint32_t *service_id)
911 {
912         struct txa_service_data *txa;
913
914         txa = txa_service_id_to_data(id);
915         if (txa->service_id == TXA_INVALID_SERVICE_ID)
916                 return -ESRCH;
917
918         if (service_id == NULL)
919                 return -EINVAL;
920
921         *service_id = txa->service_id;
922         return 0;
923 }
924
925 static int
926 txa_service_start(uint8_t id)
927 {
928         return txa_service_ctrl(id, 1);
929 }
930
931 static int
932 txa_service_stats_get(uint8_t id,
933                 struct rte_event_eth_tx_adapter_stats *stats)
934 {
935         struct txa_service_data *txa;
936
937         txa = txa_service_id_to_data(id);
938         *stats = txa->stats;
939         return 0;
940 }
941
942 static int
943 txa_service_stats_reset(uint8_t id)
944 {
945         struct txa_service_data *txa;
946
947         txa = txa_service_id_to_data(id);
948         memset(&txa->stats, 0, sizeof(txa->stats));
949         return 0;
950 }
951
952 static int
953 txa_service_stop(uint8_t id)
954 {
955         return txa_service_ctrl(id, 0);
956 }
957
958
959 int
960 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
961                                 struct rte_event_port_conf *port_conf)
962 {
963         struct rte_eventdev *dev;
964         int ret;
965
966         if (port_conf == NULL)
967                 return -EINVAL;
968
969         RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
970         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
971
972         dev = &rte_eventdevs[dev_id];
973
974         ret = txa_init();
975         if (ret != 0)
976                 return ret;
977
978         if (txa_adapter_exist(id))
979                 return -EEXIST;
980
981         txa_dev_id_array[id] = dev_id;
982         if (txa_dev_adapter_create(id))
983                 ret = txa_dev_adapter_create(id)(id, dev);
984
985         if (ret != 0) {
986                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
987                 return ret;
988         }
989
990         ret = txa_service_adapter_create(id, dev, port_conf);
991         if (ret != 0) {
992                 if (txa_dev_adapter_free(id))
993                         txa_dev_adapter_free(id)(id, dev);
994                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
995                 return ret;
996         }
997         rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
998                 ret);
999         txa_dev_id_array[id] = dev_id;
1000         return 0;
1001 }
1002
1003 int
1004 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1005                                 rte_event_eth_tx_adapter_conf_cb conf_cb,
1006                                 void *conf_arg)
1007 {
1008         struct rte_eventdev *dev;
1009         int ret;
1010
1011         RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1012         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1013
1014         ret = txa_init();
1015         if (ret != 0)
1016                 return ret;
1017
1018         if (txa_adapter_exist(id))
1019                 return -EINVAL;
1020
1021         dev = &rte_eventdevs[dev_id];
1022
1023         txa_dev_id_array[id] = dev_id;
1024         if (txa_dev_adapter_create_ext(id))
1025                 ret = txa_dev_adapter_create_ext(id)(id, dev);
1026
1027         if (ret != 0) {
1028                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1029                 return ret;
1030         }
1031
1032         ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1033         if (ret != 0) {
1034                 if (txa_dev_adapter_free(id))
1035                         txa_dev_adapter_free(id)(id, dev);
1036                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1037                 return ret;
1038         }
1039
1040         rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1041                 ret);
1042         txa_dev_id_array[id] = dev_id;
1043         return 0;
1044 }
1045
1046
1047 int
1048 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1049 {
1050         TXA_CHECK_OR_ERR_RET(id);
1051
1052         return txa_service_event_port_get(id, event_port_id);
1053 }
1054
1055 int
1056 rte_event_eth_tx_adapter_free(uint8_t id)
1057 {
1058         int ret;
1059
1060         TXA_CHECK_OR_ERR_RET(id);
1061
1062         ret = txa_dev_adapter_free(id) ?
1063                 txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1064                 0;
1065
1066         if (ret == 0)
1067                 ret = txa_service_adapter_free(id);
1068         txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1069
1070         rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1071         return ret;
1072 }
1073
1074 int
1075 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1076                                 uint16_t eth_dev_id,
1077                                 int32_t queue)
1078 {
1079         struct rte_eth_dev *eth_dev;
1080         int ret;
1081         uint32_t caps;
1082
1083         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1084         TXA_CHECK_OR_ERR_RET(id);
1085
1086         eth_dev = &rte_eth_devices[eth_dev_id];
1087         TXA_CHECK_TXQ(eth_dev, queue);
1088
1089         caps = 0;
1090         if (txa_dev_caps_get(id))
1091                 txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1092
1093         if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1094                 ret =  txa_dev_queue_add(id) ?
1095                                         txa_dev_queue_add(id)(id,
1096                                                         txa_evdev(id),
1097                                                         eth_dev,
1098                                                         queue) : 0;
1099         else
1100                 ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1101
1102         rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1103                 ret);
1104         return ret;
1105 }
1106
1107 int
1108 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1109                                 uint16_t eth_dev_id,
1110                                 int32_t queue)
1111 {
1112         struct rte_eth_dev *eth_dev;
1113         int ret;
1114         uint32_t caps;
1115
1116         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1117         TXA_CHECK_OR_ERR_RET(id);
1118
1119         eth_dev = &rte_eth_devices[eth_dev_id];
1120
1121         caps = 0;
1122
1123         if (txa_dev_caps_get(id))
1124                 txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1125
1126         if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1127                 ret =  txa_dev_queue_del(id) ?
1128                                         txa_dev_queue_del(id)(id, txa_evdev(id),
1129                                                         eth_dev,
1130                                                         queue) : 0;
1131         else
1132                 ret = txa_service_queue_del(id, eth_dev, queue);
1133
1134         rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1135                 ret);
1136         return ret;
1137 }
1138
1139 int
1140 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1141 {
1142         TXA_CHECK_OR_ERR_RET(id);
1143
1144         return txa_service_id_get(id, service_id);
1145 }
1146
1147 int
1148 rte_event_eth_tx_adapter_start(uint8_t id)
1149 {
1150         int ret;
1151
1152         TXA_CHECK_OR_ERR_RET(id);
1153
1154         ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1155         if (ret == 0)
1156                 ret = txa_service_start(id);
1157         rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1158         return ret;
1159 }
1160
1161 int
1162 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1163                                 struct rte_event_eth_tx_adapter_stats *stats)
1164 {
1165         int ret;
1166
1167         TXA_CHECK_OR_ERR_RET(id);
1168
1169         if (stats == NULL)
1170                 return -EINVAL;
1171
1172         *stats = (struct rte_event_eth_tx_adapter_stats){0};
1173
1174         ret = txa_dev_stats_get(id) ?
1175                         txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1176
1177         if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1178                 if (txa_dev_stats_get(id)) {
1179                         struct rte_event_eth_tx_adapter_stats service_stats;
1180
1181                         ret = txa_service_stats_get(id, &service_stats);
1182                         if (ret == 0) {
1183                                 stats->tx_retry += service_stats.tx_retry;
1184                                 stats->tx_packets += service_stats.tx_packets;
1185                                 stats->tx_dropped += service_stats.tx_dropped;
1186                         }
1187                 } else
1188                         ret = txa_service_stats_get(id, stats);
1189         }
1190
1191         return ret;
1192 }
1193
1194 int
1195 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1196 {
1197         int ret;
1198
1199         TXA_CHECK_OR_ERR_RET(id);
1200
1201         ret = txa_dev_stats_reset(id) ?
1202                 txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1203         if (ret == 0)
1204                 ret = txa_service_stats_reset(id);
1205         return ret;
1206 }
1207
1208 int
1209 rte_event_eth_tx_adapter_stop(uint8_t id)
1210 {
1211         int ret;
1212
1213         TXA_CHECK_OR_ERR_RET(id);
1214
1215         ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1216         if (ret == 0)
1217                 ret = txa_service_stop(id);
1218         rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1219         return ret;
1220 }