d02ef57f4eaedfaa3c49f0171e25c143f49842d2
[dpdk.git] / lib / librte_eventdev / rte_event_eth_tx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation.
3  */
4 #include <rte_spinlock.h>
5 #include <rte_service_component.h>
6 #include <rte_ethdev.h>
7
8 #include "rte_eventdev_pmd.h"
9 #include "rte_event_eth_tx_adapter.h"
10
11 #define TXA_BATCH_SIZE          32
12 #define TXA_SERVICE_NAME_LEN    32
13 #define TXA_MEM_NAME_LEN        32
14 #define TXA_FLUSH_THRESHOLD     1024
15 #define TXA_RETRY_CNT           100
16 #define TXA_MAX_NB_TX           128
17 #define TXA_INVALID_DEV_ID      INT32_C(-1)
18 #define TXA_INVALID_SERVICE_ID  INT64_C(-1)
19
20 #define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
21
22 #define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
23
24 #define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
25
26 #define txa_dev_adapter_create_ext(t) \
27                                 txa_evdev(t)->dev_ops->eth_tx_adapter_create
28
29 #define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
30
31 #define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
32
33 #define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
34
35 #define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
36
37 #define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
38
39 #define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
40
41 #define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
42
43 #define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
44 do { \
45         if (!txa_valid_id(id)) { \
46                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
47                 return retval; \
48         } \
49 } while (0)
50
51 #define TXA_CHECK_OR_ERR_RET(id) \
52 do {\
53         int ret; \
54         RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
55         ret = txa_init(); \
56         if (ret != 0) \
57                 return ret; \
58         if (!txa_adapter_exist((id))) \
59                 return -EINVAL; \
60 } while (0)
61
62 #define TXA_CHECK_TXQ(dev, queue) \
63 do {\
64         if ((dev)->data->nb_tx_queues == 0) { \
65                 RTE_EDEV_LOG_ERR("No tx queues configured"); \
66                 return -EINVAL; \
67         } \
68         if ((queue) != -1 && \
69                 (uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
70                 RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
71                                 (uint16_t)(queue)); \
72                 return -EINVAL; \
73         } \
74 } while (0)
75
76 /* Tx retry callback structure */
77 struct txa_retry {
78         /* Ethernet port id */
79         uint16_t port_id;
80         /* Tx queue */
81         uint16_t tx_queue;
82         /* Adapter ID */
83         uint8_t id;
84 };
85
86 /* Per queue structure */
87 struct txa_service_queue_info {
88         /* Queue has been added */
89         uint8_t added;
90         /* Retry callback argument */
91         struct txa_retry txa_retry;
92         /* Tx buffer */
93         struct rte_eth_dev_tx_buffer *tx_buf;
94 };
95
96 /* PMD private structure */
97 struct txa_service_data {
98         /* Max mbufs processed in any service function invocation */
99         uint32_t max_nb_tx;
100         /* Number of Tx queues in adapter */
101         uint32_t nb_queues;
102         /*  Synchronization with data path */
103         rte_spinlock_t tx_lock;
104         /* Event port ID */
105         uint8_t port_id;
106         /* Event device identifier */
107         uint8_t eventdev_id;
108         /* Highest port id supported + 1 */
109         uint16_t dev_count;
110         /* Loop count to flush Tx buffers */
111         int loop_cnt;
112         /* Per ethernet device structure */
113         struct txa_service_ethdev *txa_ethdev;
114         /* Statistics */
115         struct rte_event_eth_tx_adapter_stats stats;
116         /* Adapter Identifier */
117         uint8_t id;
118         /* Conf arg must be freed */
119         uint8_t conf_free;
120         /* Configuration callback */
121         rte_event_eth_tx_adapter_conf_cb conf_cb;
122         /* Configuration callback argument */
123         void *conf_arg;
124         /* socket id */
125         int socket_id;
126         /* Per adapter EAL service */
127         int64_t service_id;
128         /* Memory allocation name */
129         char mem_name[TXA_MEM_NAME_LEN];
130 } __rte_cache_aligned;
131
132 /* Per eth device structure */
133 struct txa_service_ethdev {
134         /* Pointer to ethernet device */
135         struct rte_eth_dev *dev;
136         /* Number of queues added */
137         uint16_t nb_queues;
138         /* PMD specific queue data */
139         void *queues;
140 };
141
142 /* Array of adapter instances, initialized with event device id
143  * when adapter is created
144  */
145 static int *txa_dev_id_array;
146
147 /* Array of pointers to service implementation data */
148 static struct txa_service_data **txa_service_data_array;
149
150 static int32_t txa_service_func(void *args);
151 static int txa_service_adapter_create_ext(uint8_t id,
152                         struct rte_eventdev *dev,
153                         rte_event_eth_tx_adapter_conf_cb conf_cb,
154                         void *conf_arg);
155 static int txa_service_queue_del(uint8_t id,
156                                 const struct rte_eth_dev *dev,
157                                 int32_t tx_queue_id);
158
159 static int
160 txa_adapter_exist(uint8_t id)
161 {
162         return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
163 }
164
165 static inline int
166 txa_valid_id(uint8_t id)
167 {
168         return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
169 }
170
171 static void *
172 txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
173 {
174         const struct rte_memzone *mz;
175         unsigned int sz;
176
177         sz = elt_size * nb_elems;
178         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
179
180         mz = rte_memzone_lookup(name);
181         if (mz == NULL) {
182                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
183                                                  RTE_CACHE_LINE_SIZE);
184                 if (mz == NULL) {
185                         RTE_EDEV_LOG_ERR("failed to reserve memzone"
186                                         " name = %s err = %"
187                                         PRId32, name, rte_errno);
188                         return NULL;
189                 }
190         }
191
192         return  mz->addr;
193 }
194
195 static int
196 txa_dev_id_array_init(void)
197 {
198         if (txa_dev_id_array == NULL) {
199                 int i;
200
201                 txa_dev_id_array = txa_memzone_array_get("txa_adapter_array",
202                                         sizeof(int),
203                                         RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
204                 if (txa_dev_id_array == NULL)
205                         return -ENOMEM;
206
207                 for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
208                         txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
209         }
210
211         return 0;
212 }
213
214 static int
215 txa_init(void)
216 {
217         return txa_dev_id_array_init();
218 }
219
220 static int
221 txa_service_data_init(void)
222 {
223         if (txa_service_data_array == NULL) {
224                 txa_service_data_array =
225                                 txa_memzone_array_get("txa_service_data_array",
226                                         sizeof(int),
227                                         RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
228                 if (txa_service_data_array == NULL)
229                         return -ENOMEM;
230         }
231
232         return 0;
233 }
234
235 static inline struct txa_service_data *
236 txa_service_id_to_data(uint8_t id)
237 {
238         return txa_service_data_array[id];
239 }
240
241 static inline struct txa_service_queue_info *
242 txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
243                 uint16_t tx_queue_id)
244 {
245         struct txa_service_queue_info *tqi;
246
247         if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
248                 return NULL;
249
250         tqi = txa->txa_ethdev[port_id].queues;
251
252         return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
253 }
254
255 static int
256 txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
257                 struct rte_event_eth_tx_adapter_conf *conf, void *arg)
258 {
259         int ret;
260         struct rte_eventdev *dev;
261         struct rte_event_port_conf *pc;
262         struct rte_event_dev_config dev_conf;
263         int started;
264         uint8_t port_id;
265
266         pc = arg;
267         dev = &rte_eventdevs[dev_id];
268         dev_conf = dev->data->dev_conf;
269
270         started = dev->data->dev_started;
271         if (started)
272                 rte_event_dev_stop(dev_id);
273
274         port_id = dev_conf.nb_event_ports;
275         dev_conf.nb_event_ports += 1;
276
277         ret = rte_event_dev_configure(dev_id, &dev_conf);
278         if (ret) {
279                 RTE_EDEV_LOG_ERR("failed to configure event dev %u",
280                                                 dev_id);
281                 if (started) {
282                         if (rte_event_dev_start(dev_id))
283                                 return -EIO;
284                 }
285                 return ret;
286         }
287
288         pc->disable_implicit_release = 0;
289         ret = rte_event_port_setup(dev_id, port_id, pc);
290         if (ret) {
291                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
292                                         port_id);
293                 if (started) {
294                         if (rte_event_dev_start(dev_id))
295                                 return -EIO;
296                 }
297                 return ret;
298         }
299
300         conf->event_port_id = port_id;
301         conf->max_nb_tx = TXA_MAX_NB_TX;
302         if (started)
303                 ret = rte_event_dev_start(dev_id);
304         return ret;
305 }
306
307 static int
308 txa_service_ethdev_alloc(struct txa_service_data *txa)
309 {
310         struct txa_service_ethdev *txa_ethdev;
311         uint16_t i, dev_count;
312
313         dev_count = rte_eth_dev_count_avail();
314         if (txa->txa_ethdev && dev_count == txa->dev_count)
315                 return 0;
316
317         txa_ethdev = rte_zmalloc_socket(txa->mem_name,
318                                         dev_count * sizeof(*txa_ethdev),
319                                         0,
320                                         txa->socket_id);
321         if (txa_ethdev == NULL) {
322                 RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
323                 return -ENOMEM;
324         }
325
326         if (txa->dev_count)
327                 memcpy(txa_ethdev, txa->txa_ethdev,
328                         txa->dev_count * sizeof(*txa_ethdev));
329
330         RTE_ETH_FOREACH_DEV(i) {
331                 if (i == dev_count)
332                         break;
333                 txa_ethdev[i].dev = &rte_eth_devices[i];
334         }
335
336         txa->txa_ethdev = txa_ethdev;
337         txa->dev_count = dev_count;
338         return 0;
339 }
340
341 static int
342 txa_service_queue_array_alloc(struct txa_service_data *txa,
343                         uint16_t port_id)
344 {
345         struct txa_service_queue_info *tqi;
346         uint16_t nb_queue;
347         int ret;
348
349         ret = txa_service_ethdev_alloc(txa);
350         if (ret != 0)
351                 return ret;
352
353         if (txa->txa_ethdev[port_id].queues)
354                 return 0;
355
356         nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
357         tqi = rte_zmalloc_socket(txa->mem_name,
358                                 nb_queue *
359                                 sizeof(struct txa_service_queue_info), 0,
360                                 txa->socket_id);
361         if (tqi == NULL)
362                 return -ENOMEM;
363         txa->txa_ethdev[port_id].queues = tqi;
364         return 0;
365 }
366
367 static void
368 txa_service_queue_array_free(struct txa_service_data *txa,
369                         uint16_t port_id)
370 {
371         struct txa_service_ethdev *txa_ethdev;
372         struct txa_service_queue_info *tqi;
373
374         txa_ethdev = &txa->txa_ethdev[port_id];
375         if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
376                 return;
377
378         tqi = txa_ethdev->queues;
379         txa_ethdev->queues = NULL;
380         rte_free(tqi);
381
382         if (txa->nb_queues == 0) {
383                 rte_free(txa->txa_ethdev);
384                 txa->txa_ethdev = NULL;
385         }
386 }
387
388 static void
389 txa_service_unregister(struct txa_service_data *txa)
390 {
391         if (txa->service_id != TXA_INVALID_SERVICE_ID) {
392                 rte_service_component_runstate_set(txa->service_id, 0);
393                 while (rte_service_may_be_active(txa->service_id))
394                         rte_pause();
395                 rte_service_component_unregister(txa->service_id);
396         }
397         txa->service_id = TXA_INVALID_SERVICE_ID;
398 }
399
400 static int
401 txa_service_register(struct txa_service_data *txa)
402 {
403         int ret;
404         struct rte_service_spec service;
405         struct rte_event_eth_tx_adapter_conf conf;
406
407         if (txa->service_id != TXA_INVALID_SERVICE_ID)
408                 return 0;
409
410         memset(&service, 0, sizeof(service));
411         snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
412         service.socket_id = txa->socket_id;
413         service.callback = txa_service_func;
414         service.callback_userdata = txa;
415         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
416         ret = rte_service_component_register(&service,
417                                         (uint32_t *)&txa->service_id);
418         if (ret) {
419                 RTE_EDEV_LOG_ERR("failed to register service %s err = %"
420                                  PRId32, service.name, ret);
421                 return ret;
422         }
423
424         ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
425         if (ret) {
426                 txa_service_unregister(txa);
427                 return ret;
428         }
429
430         rte_service_component_runstate_set(txa->service_id, 1);
431         txa->port_id = conf.event_port_id;
432         txa->max_nb_tx = conf.max_nb_tx;
433         return 0;
434 }
435
436 static struct rte_eth_dev_tx_buffer *
437 txa_service_tx_buf_alloc(struct txa_service_data *txa,
438                         const struct rte_eth_dev *dev)
439 {
440         struct rte_eth_dev_tx_buffer *tb;
441         uint16_t port_id;
442
443         port_id = dev->data->port_id;
444         tb = rte_zmalloc_socket(txa->mem_name,
445                                 RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
446                                 0,
447                                 rte_eth_dev_socket_id(port_id));
448         if (tb == NULL)
449                 RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
450         return tb;
451 }
452
453 static int
454 txa_service_is_queue_added(struct txa_service_data *txa,
455                         const struct rte_eth_dev *dev,
456                         uint16_t tx_queue_id)
457 {
458         struct txa_service_queue_info *tqi;
459
460         tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
461         return tqi && tqi->added;
462 }
463
464 static int
465 txa_service_ctrl(uint8_t id, int start)
466 {
467         int ret;
468         struct txa_service_data *txa;
469
470         txa = txa_service_id_to_data(id);
471         if (txa->service_id == TXA_INVALID_SERVICE_ID)
472                 return 0;
473
474         ret = rte_service_runstate_set(txa->service_id, start);
475         if (ret == 0 && !start) {
476                 while (rte_service_may_be_active(txa->service_id))
477                         rte_pause();
478         }
479         return ret;
480 }
481
482 static void
483 txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
484                         void *userdata)
485 {
486         struct txa_retry *tr;
487         struct txa_service_data *data;
488         struct rte_event_eth_tx_adapter_stats *stats;
489         uint16_t sent = 0;
490         unsigned int retry = 0;
491         uint16_t i, n;
492
493         tr = (struct txa_retry *)(uintptr_t)userdata;
494         data = txa_service_id_to_data(tr->id);
495         stats = &data->stats;
496
497         do {
498                 n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
499                                &pkts[sent], unsent - sent);
500
501                 sent += n;
502         } while (sent != unsent && retry++ < TXA_RETRY_CNT);
503
504         for (i = sent; i < unsent; i++)
505                 rte_pktmbuf_free(pkts[i]);
506
507         stats->tx_retry += retry;
508         stats->tx_packets += sent;
509         stats->tx_dropped += unsent - sent;
510 }
511
512 static void
513 txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
514         uint32_t n)
515 {
516         uint32_t i;
517         uint16_t nb_tx;
518         struct rte_event_eth_tx_adapter_stats *stats;
519
520         stats = &txa->stats;
521
522         nb_tx = 0;
523         for (i = 0; i < n; i++) {
524                 struct rte_mbuf *m;
525                 uint16_t port;
526                 uint16_t queue;
527                 struct txa_service_queue_info *tqi;
528
529                 m = ev[i].mbuf;
530                 port = m->port;
531                 queue = rte_event_eth_tx_adapter_txq_get(m);
532
533                 tqi = txa_service_queue(txa, port, queue);
534                 if (unlikely(tqi == NULL || !tqi->added)) {
535                         rte_pktmbuf_free(m);
536                         continue;
537                 }
538
539                 nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
540         }
541
542         stats->tx_packets += nb_tx;
543 }
544
545 static int32_t
546 txa_service_func(void *args)
547 {
548         struct txa_service_data *txa = args;
549         uint8_t dev_id;
550         uint8_t port;
551         uint16_t n;
552         uint32_t nb_tx, max_nb_tx;
553         struct rte_event ev[TXA_BATCH_SIZE];
554
555         dev_id = txa->eventdev_id;
556         max_nb_tx = txa->max_nb_tx;
557         port = txa->port_id;
558
559         if (txa->nb_queues == 0)
560                 return 0;
561
562         if (!rte_spinlock_trylock(&txa->tx_lock))
563                 return 0;
564
565         for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
566
567                 n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
568                 if (!n)
569                         break;
570                 txa_service_tx(txa, ev, n);
571         }
572
573         if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
574
575                 struct txa_service_ethdev *tdi;
576                 struct txa_service_queue_info *tqi;
577                 struct rte_eth_dev *dev;
578                 uint16_t i;
579
580                 tdi = txa->txa_ethdev;
581                 nb_tx = 0;
582
583                 RTE_ETH_FOREACH_DEV(i) {
584                         uint16_t q;
585
586                         if (i == txa->dev_count)
587                                 break;
588
589                         dev = tdi[i].dev;
590                         if (tdi[i].nb_queues == 0)
591                                 continue;
592                         for (q = 0; q < dev->data->nb_tx_queues; q++) {
593
594                                 tqi = txa_service_queue(txa, i, q);
595                                 if (unlikely(tqi == NULL || !tqi->added))
596                                         continue;
597
598                                 nb_tx += rte_eth_tx_buffer_flush(i, q,
599                                                         tqi->tx_buf);
600                         }
601                 }
602
603                 txa->stats.tx_packets += nb_tx;
604         }
605         rte_spinlock_unlock(&txa->tx_lock);
606         return 0;
607 }
608
609 static int
610 txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
611                         struct rte_event_port_conf *port_conf)
612 {
613         struct txa_service_data *txa;
614         struct rte_event_port_conf *cb_conf;
615         int ret;
616
617         cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
618         if (cb_conf == NULL)
619                 return -ENOMEM;
620
621         *cb_conf = *port_conf;
622         ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
623                                         cb_conf);
624         if (ret) {
625                 rte_free(cb_conf);
626                 return ret;
627         }
628
629         txa = txa_service_id_to_data(id);
630         txa->conf_free = 1;
631         return ret;
632 }
633
634 static int
635 txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
636                         rte_event_eth_tx_adapter_conf_cb conf_cb,
637                         void *conf_arg)
638 {
639         struct txa_service_data *txa;
640         int socket_id;
641         char mem_name[TXA_SERVICE_NAME_LEN];
642         int ret;
643
644         if (conf_cb == NULL)
645                 return -EINVAL;
646
647         socket_id = dev->data->socket_id;
648         snprintf(mem_name, TXA_MEM_NAME_LEN,
649                 "rte_event_eth_txa_%d",
650                 id);
651
652         ret = txa_service_data_init();
653         if (ret != 0)
654                 return ret;
655
656         txa = rte_zmalloc_socket(mem_name,
657                                 sizeof(*txa),
658                                 RTE_CACHE_LINE_SIZE, socket_id);
659         if (txa == NULL) {
660                 RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
661                 return -ENOMEM;
662         }
663
664         txa->id = id;
665         txa->eventdev_id = dev->data->dev_id;
666         txa->socket_id = socket_id;
667         strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
668         txa->conf_cb = conf_cb;
669         txa->conf_arg = conf_arg;
670         txa->service_id = TXA_INVALID_SERVICE_ID;
671         rte_spinlock_init(&txa->tx_lock);
672         txa_service_data_array[id] = txa;
673
674         return 0;
675 }
676
677 static int
678 txa_service_event_port_get(uint8_t id, uint8_t *port)
679 {
680         struct txa_service_data *txa;
681
682         txa = txa_service_id_to_data(id);
683         if (txa->service_id == TXA_INVALID_SERVICE_ID)
684                 return -ENODEV;
685
686         *port = txa->port_id;
687         return 0;
688 }
689
690 static int
691 txa_service_adapter_free(uint8_t id)
692 {
693         struct txa_service_data *txa;
694
695         txa = txa_service_id_to_data(id);
696         if (txa->nb_queues) {
697                 RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
698                                 txa->nb_queues);
699                 return -EBUSY;
700         }
701
702         if (txa->conf_free)
703                 rte_free(txa->conf_arg);
704         rte_free(txa);
705         return 0;
706 }
707
708 static int
709 txa_service_queue_add(uint8_t id,
710                 __rte_unused struct rte_eventdev *dev,
711                 const struct rte_eth_dev *eth_dev,
712                 int32_t tx_queue_id)
713 {
714         struct txa_service_data *txa;
715         struct txa_service_ethdev *tdi;
716         struct txa_service_queue_info *tqi;
717         struct rte_eth_dev_tx_buffer *tb;
718         struct txa_retry *txa_retry;
719         int ret = 0;
720
721         txa = txa_service_id_to_data(id);
722
723         if (tx_queue_id == -1) {
724                 int nb_queues;
725                 uint16_t i, j;
726                 uint16_t *qdone;
727
728                 nb_queues = eth_dev->data->nb_tx_queues;
729                 if (txa->dev_count > eth_dev->data->port_id) {
730                         tdi = &txa->txa_ethdev[eth_dev->data->port_id];
731                         nb_queues -= tdi->nb_queues;
732                 }
733
734                 qdone = rte_zmalloc(txa->mem_name,
735                                 nb_queues * sizeof(*qdone), 0);
736                 j = 0;
737                 for (i = 0; i < nb_queues; i++) {
738                         if (txa_service_is_queue_added(txa, eth_dev, i))
739                                 continue;
740                         ret = txa_service_queue_add(id, dev, eth_dev, i);
741                         if (ret == 0)
742                                 qdone[j++] = i;
743                         else
744                                 break;
745                 }
746
747                 if (i != nb_queues) {
748                         for (i = 0; i < j; i++)
749                                 txa_service_queue_del(id, eth_dev, qdone[i]);
750                 }
751                 rte_free(qdone);
752                 return ret;
753         }
754
755         ret = txa_service_register(txa);
756         if (ret)
757                 return ret;
758
759         rte_spinlock_lock(&txa->tx_lock);
760
761         if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id)) {
762                 rte_spinlock_unlock(&txa->tx_lock);
763                 return 0;
764         }
765
766         ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
767         if (ret)
768                 goto err_unlock;
769
770         tb = txa_service_tx_buf_alloc(txa, eth_dev);
771         if (tb == NULL)
772                 goto err_unlock;
773
774         tdi = &txa->txa_ethdev[eth_dev->data->port_id];
775         tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
776
777         txa_retry = &tqi->txa_retry;
778         txa_retry->id = txa->id;
779         txa_retry->port_id = eth_dev->data->port_id;
780         txa_retry->tx_queue = tx_queue_id;
781
782         rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
783         rte_eth_tx_buffer_set_err_callback(tb,
784                 txa_service_buffer_retry, txa_retry);
785
786         tqi->tx_buf = tb;
787         tqi->added = 1;
788         tdi->nb_queues++;
789         txa->nb_queues++;
790
791 err_unlock:
792         if (txa->nb_queues == 0) {
793                 txa_service_queue_array_free(txa,
794                                         eth_dev->data->port_id);
795                 txa_service_unregister(txa);
796         }
797
798         rte_spinlock_unlock(&txa->tx_lock);
799         return 0;
800 }
801
802 static int
803 txa_service_queue_del(uint8_t id,
804                 const struct rte_eth_dev *dev,
805                 int32_t tx_queue_id)
806 {
807         struct txa_service_data *txa;
808         struct txa_service_queue_info *tqi;
809         struct rte_eth_dev_tx_buffer *tb;
810         uint16_t port_id;
811
812         txa = txa_service_id_to_data(id);
813         port_id = dev->data->port_id;
814
815         if (tx_queue_id == -1) {
816                 uint16_t i, q, nb_queues;
817                 int ret = 0;
818
819                 nb_queues = txa->nb_queues;
820                 if (nb_queues == 0)
821                         return 0;
822
823                 i = 0;
824                 q = 0;
825                 tqi = txa->txa_ethdev[port_id].queues;
826
827                 while (i < nb_queues) {
828
829                         if (tqi[q].added) {
830                                 ret = txa_service_queue_del(id, dev, q);
831                                 if (ret != 0)
832                                         break;
833                         }
834                         i++;
835                         q++;
836                 }
837                 return ret;
838         }
839
840         txa = txa_service_id_to_data(id);
841
842         tqi = txa_service_queue(txa, port_id, tx_queue_id);
843         if (tqi == NULL || !tqi->added)
844                 return 0;
845
846         tb = tqi->tx_buf;
847         tqi->added = 0;
848         tqi->tx_buf = NULL;
849         rte_free(tb);
850         txa->nb_queues--;
851         txa->txa_ethdev[port_id].nb_queues--;
852
853         txa_service_queue_array_free(txa, port_id);
854         return 0;
855 }
856
857 static int
858 txa_service_id_get(uint8_t id, uint32_t *service_id)
859 {
860         struct txa_service_data *txa;
861
862         txa = txa_service_id_to_data(id);
863         if (txa->service_id == TXA_INVALID_SERVICE_ID)
864                 return -ESRCH;
865
866         if (service_id == NULL)
867                 return -EINVAL;
868
869         *service_id = txa->service_id;
870         return 0;
871 }
872
873 static int
874 txa_service_start(uint8_t id)
875 {
876         return txa_service_ctrl(id, 1);
877 }
878
879 static int
880 txa_service_stats_get(uint8_t id,
881                 struct rte_event_eth_tx_adapter_stats *stats)
882 {
883         struct txa_service_data *txa;
884
885         txa = txa_service_id_to_data(id);
886         *stats = txa->stats;
887         return 0;
888 }
889
890 static int
891 txa_service_stats_reset(uint8_t id)
892 {
893         struct txa_service_data *txa;
894
895         txa = txa_service_id_to_data(id);
896         memset(&txa->stats, 0, sizeof(txa->stats));
897         return 0;
898 }
899
900 static int
901 txa_service_stop(uint8_t id)
902 {
903         return txa_service_ctrl(id, 0);
904 }
905
906
907 int
908 rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
909                                 struct rte_event_port_conf *port_conf)
910 {
911         struct rte_eventdev *dev;
912         int ret;
913
914         if (port_conf == NULL)
915                 return -EINVAL;
916
917         RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
918         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
919
920         dev = &rte_eventdevs[dev_id];
921
922         ret = txa_init();
923         if (ret != 0)
924                 return ret;
925
926         if (txa_adapter_exist(id))
927                 return -EEXIST;
928
929         txa_dev_id_array[id] = dev_id;
930         if (txa_dev_adapter_create(id))
931                 ret = txa_dev_adapter_create(id)(id, dev);
932
933         if (ret != 0) {
934                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
935                 return ret;
936         }
937
938         ret = txa_service_adapter_create(id, dev, port_conf);
939         if (ret != 0) {
940                 if (txa_dev_adapter_free(id))
941                         txa_dev_adapter_free(id)(id, dev);
942                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
943                 return ret;
944         }
945
946         txa_dev_id_array[id] = dev_id;
947         return 0;
948 }
949
950 int
951 rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
952                                 rte_event_eth_tx_adapter_conf_cb conf_cb,
953                                 void *conf_arg)
954 {
955         struct rte_eventdev *dev;
956         int ret;
957
958         RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
959         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
960
961         ret = txa_init();
962         if (ret != 0)
963                 return ret;
964
965         if (txa_adapter_exist(id))
966                 return -EINVAL;
967
968         dev = &rte_eventdevs[dev_id];
969
970         txa_dev_id_array[id] = dev_id;
971         if (txa_dev_adapter_create_ext(id))
972                 ret = txa_dev_adapter_create_ext(id)(id, dev);
973
974         if (ret != 0) {
975                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
976                 return ret;
977         }
978
979         ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
980         if (ret != 0) {
981                 if (txa_dev_adapter_free(id))
982                         txa_dev_adapter_free(id)(id, dev);
983                 txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
984                 return ret;
985         }
986
987         txa_dev_id_array[id] = dev_id;
988         return 0;
989 }
990
991
992 int
993 rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
994 {
995         TXA_CHECK_OR_ERR_RET(id);
996
997         return txa_service_event_port_get(id, event_port_id);
998 }
999
1000 int
1001 rte_event_eth_tx_adapter_free(uint8_t id)
1002 {
1003         int ret;
1004
1005         TXA_CHECK_OR_ERR_RET(id);
1006
1007         ret = txa_dev_adapter_free(id) ?
1008                 txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1009                 0;
1010
1011         if (ret == 0)
1012                 ret = txa_service_adapter_free(id);
1013         txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1014
1015         return ret;
1016 }
1017
1018 int
1019 rte_event_eth_tx_adapter_queue_add(uint8_t id,
1020                                 uint16_t eth_dev_id,
1021                                 int32_t queue)
1022 {
1023         struct rte_eth_dev *eth_dev;
1024         int ret;
1025         uint32_t caps;
1026
1027         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1028         TXA_CHECK_OR_ERR_RET(id);
1029
1030         eth_dev = &rte_eth_devices[eth_dev_id];
1031         TXA_CHECK_TXQ(eth_dev, queue);
1032
1033         caps = 0;
1034         if (txa_dev_caps_get(id))
1035                 txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1036
1037         if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1038                 ret =  txa_dev_queue_add(id) ?
1039                                         txa_dev_queue_add(id)(id,
1040                                                         txa_evdev(id),
1041                                                         eth_dev,
1042                                                         queue) : 0;
1043         else
1044                 ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1045
1046         return ret;
1047 }
1048
1049 int
1050 rte_event_eth_tx_adapter_queue_del(uint8_t id,
1051                                 uint16_t eth_dev_id,
1052                                 int32_t queue)
1053 {
1054         struct rte_eth_dev *eth_dev;
1055         int ret;
1056         uint32_t caps;
1057
1058         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1059         TXA_CHECK_OR_ERR_RET(id);
1060
1061         eth_dev = &rte_eth_devices[eth_dev_id];
1062
1063         caps = 0;
1064
1065         if (txa_dev_caps_get(id))
1066                 txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1067
1068         if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1069                 ret =  txa_dev_queue_del(id) ?
1070                                         txa_dev_queue_del(id)(id, txa_evdev(id),
1071                                                         eth_dev,
1072                                                         queue) : 0;
1073         else
1074                 ret = txa_service_queue_del(id, eth_dev, queue);
1075
1076         return ret;
1077 }
1078
1079 int
1080 rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1081 {
1082         TXA_CHECK_OR_ERR_RET(id);
1083
1084         return txa_service_id_get(id, service_id);
1085 }
1086
1087 int
1088 rte_event_eth_tx_adapter_start(uint8_t id)
1089 {
1090         int ret;
1091
1092         TXA_CHECK_OR_ERR_RET(id);
1093
1094         ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1095         if (ret == 0)
1096                 ret = txa_service_start(id);
1097         return ret;
1098 }
1099
1100 int
1101 rte_event_eth_tx_adapter_stats_get(uint8_t id,
1102                                 struct rte_event_eth_tx_adapter_stats *stats)
1103 {
1104         int ret;
1105
1106         TXA_CHECK_OR_ERR_RET(id);
1107
1108         if (stats == NULL)
1109                 return -EINVAL;
1110
1111         *stats = (struct rte_event_eth_tx_adapter_stats){0};
1112
1113         ret = txa_dev_stats_get(id) ?
1114                         txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1115
1116         if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1117                 if (txa_dev_stats_get(id)) {
1118                         struct rte_event_eth_tx_adapter_stats service_stats;
1119
1120                         ret = txa_service_stats_get(id, &service_stats);
1121                         if (ret == 0) {
1122                                 stats->tx_retry += service_stats.tx_retry;
1123                                 stats->tx_packets += service_stats.tx_packets;
1124                                 stats->tx_dropped += service_stats.tx_dropped;
1125                         }
1126                 } else
1127                         ret = txa_service_stats_get(id, stats);
1128         }
1129
1130         return ret;
1131 }
1132
1133 int
1134 rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1135 {
1136         int ret;
1137
1138         TXA_CHECK_OR_ERR_RET(id);
1139
1140         ret = txa_dev_stats_reset(id) ?
1141                 txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1142         if (ret == 0)
1143                 ret = txa_service_stats_reset(id);
1144         return ret;
1145 }
1146
1147 int
1148 rte_event_eth_tx_adapter_stop(uint8_t id)
1149 {
1150         int ret;
1151
1152         TXA_CHECK_OR_ERR_RET(id);
1153
1154         ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1155         if (ret == 0)
1156                 ret = txa_service_stop(id);
1157         return ret;
1158 }