lib: use common macro RTE_DIM
[dpdk.git] / lib / librte_port / rte_port_eventdev.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2019 Intel Corporation
3  */
4
5 #include <string.h>
6 #include <stdint.h>
7
8 #include <rte_mbuf.h>
9 #include <rte_malloc.h>
10
11 #include "rte_port_eventdev.h"
12
13 /*
14  * Port EVENTDEV Reader
15  */
16 #ifdef RTE_PORT_STATS_COLLECT
17
18 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \
19         do {port->stats.n_pkts_in += val;} while (0)
20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \
21         do {port->stats.n_pkts_drop += val;} while (0)
22
23 #else
24
25 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val)
26 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val)
27
28 #endif
29
30 struct rte_port_eventdev_reader {
31         struct rte_port_in_stats stats;
32
33         uint8_t  eventdev_id;
34         uint16_t port_id;
35
36         struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX];
37 };
38
39 static void *
40 rte_port_eventdev_reader_create(void *params, int socket_id)
41 {
42         struct rte_port_eventdev_reader_params *conf =
43                         params;
44         struct rte_port_eventdev_reader *port;
45
46         /* Check input parameters */
47         if (conf == NULL) {
48                 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
49                 return NULL;
50         }
51
52         /* Memory allocation */
53         port = rte_zmalloc_socket("PORT", sizeof(*port),
54                 RTE_CACHE_LINE_SIZE, socket_id);
55         if (port == NULL) {
56                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
57                 return NULL;
58         }
59
60         /* Initialization */
61         port->eventdev_id = conf->eventdev_id;
62         port->port_id = conf->port_id;
63
64         return port;
65 }
66
67 static int
68 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
69 {
70         struct rte_port_eventdev_reader *p = port;
71         uint16_t rx_evts_cnt, i;
72
73         rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id,
74                         p->ev, n_pkts, 0);
75
76         for (i = 0; i < rx_evts_cnt; i++)
77                 pkts[i] = p->ev[i].mbuf;
78
79         RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt);
80
81         return rx_evts_cnt;
82 }
83
84 static int
85 rte_port_eventdev_reader_free(void *port)
86 {
87         if (port == NULL) {
88                 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
89                 return -EINVAL;
90         }
91
92         rte_free(port);
93
94         return 0;
95 }
96
97 static int rte_port_eventdev_reader_stats_read(void *port,
98         struct rte_port_in_stats *stats, int clear)
99 {
100         struct rte_port_eventdev_reader *p =
101                         port;
102
103         if (stats != NULL)
104                 memcpy(stats, &p->stats, sizeof(p->stats));
105
106         if (clear)
107                 memset(&p->stats, 0, sizeof(p->stats));
108
109         return 0;
110 }
111
112 /*
113  * Port EVENTDEV Writer
114  */
115 #ifdef RTE_PORT_STATS_COLLECT
116
117 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \
118         do {port->stats.n_pkts_in += val;} while (0)
119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \
120         do {port->stats.n_pkts_drop += val;} while (0)
121
122 #else
123
124 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val)
125 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val)
126
127 #endif
128
129 struct rte_port_eventdev_writer {
130         struct rte_port_out_stats stats;
131
132         struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
133
134         uint32_t enq_burst_sz;
135         uint32_t enq_buf_count;
136         uint64_t bsz_mask;
137
138         uint8_t eventdev_id;
139         uint8_t port_id;
140         uint8_t queue_id;
141         uint8_t sched_type;
142         uint8_t evt_op;
143 };
144
145 static void *
146 rte_port_eventdev_writer_create(void *params, int socket_id)
147 {
148         struct rte_port_eventdev_writer_params *conf =
149                         params;
150         struct rte_port_eventdev_writer *port;
151         unsigned int i;
152
153         /* Check input parameters */
154         if ((conf == NULL) ||
155                 (conf->enq_burst_sz == 0) ||
156                 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
157                 (!rte_is_power_of_2(conf->enq_burst_sz))) {
158                 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
159                 return NULL;
160         }
161
162         /* Memory allocation */
163         port = rte_zmalloc_socket("PORT", sizeof(*port),
164                 RTE_CACHE_LINE_SIZE, socket_id);
165         if (port == NULL) {
166                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
167                 return NULL;
168         }
169
170         /* Initialization */
171         port->enq_burst_sz = conf->enq_burst_sz;
172         port->enq_buf_count = 0;
173         port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
174
175         port->eventdev_id = conf->eventdev_id;
176         port->port_id = conf->port_id;
177         port->queue_id = conf->queue_id;
178         port->sched_type = conf->sched_type;
179         port->evt_op = conf->evt_op;
180         memset(&port->ev, 0, sizeof(port->ev));
181
182         for (i = 0; i < RTE_DIM(port->ev); i++) {
183                 port->ev[i].queue_id = port->queue_id;
184                 port->ev[i].sched_type = port->sched_type;
185                 port->ev[i].op = port->evt_op;
186         }
187
188         return port;
189 }
190
191 static inline void
192 send_burst(struct rte_port_eventdev_writer *p)
193 {
194         uint32_t nb_enq;
195
196         nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
197                         p->ev, p->enq_buf_count);
198
199         RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count -
200                         nb_enq);
201
202         for (; nb_enq < p->enq_buf_count; nb_enq++)
203                 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
204
205         p->enq_buf_count = 0;
206 }
207
208 static int
209 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt)
210 {
211         struct rte_port_eventdev_writer *p = port;
212
213         p->ev[p->enq_buf_count++].mbuf  = pkt;
214         RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
215         if (p->enq_buf_count >= p->enq_burst_sz)
216                 send_burst(p);
217
218         return 0;
219 }
220
221 static int
222 rte_port_eventdev_writer_tx_bulk(void *port,
223         struct rte_mbuf **pkts,
224         uint64_t pkts_mask)
225 {
226         struct rte_port_eventdev_writer *p =
227                         port;
228         uint64_t bsz_mask = p->bsz_mask;
229         uint32_t enq_buf_count = p->enq_buf_count;
230         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
231                                         ((pkts_mask & bsz_mask) ^ bsz_mask);
232
233         if (expr == 0) {
234                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
235                 uint32_t i, n_enq_ok;
236
237                 if (enq_buf_count)
238                         send_burst(p);
239
240                 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
241
242                 struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {};
243                 for (i = 0; i < n_pkts; i++) {
244                         events[i].mbuf = pkts[i];
245                         events[i].queue_id = p->queue_id;
246                         events[i].sched_type = p->sched_type;
247                         events[i].op = p->evt_op;
248                 }
249
250                 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
251                                 events, n_pkts);
252
253                 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p,
254                                 n_pkts - n_enq_ok);
255                 for (; n_enq_ok < n_pkts; n_enq_ok++)
256                         rte_pktmbuf_free(pkts[n_enq_ok]);
257
258         } else {
259                 for (; pkts_mask;) {
260                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
261                         uint64_t pkt_mask = 1LLU << pkt_index;
262
263                         p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
264
265                         RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
266                         pkts_mask &= ~pkt_mask;
267                 }
268
269                 p->enq_buf_count = enq_buf_count;
270                 if (enq_buf_count >= p->enq_burst_sz)
271                         send_burst(p);
272         }
273
274         return 0;
275 }
276
277 static int
278 rte_port_eventdev_writer_flush(void *port)
279 {
280         struct rte_port_eventdev_writer *p =
281                         port;
282
283         if (p->enq_buf_count > 0)
284                 send_burst(p);
285
286         return 0;
287 }
288
289 static int
290 rte_port_eventdev_writer_free(void *port)
291 {
292         if (port == NULL) {
293                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
294                 return -EINVAL;
295         }
296
297         rte_port_eventdev_writer_flush(port);
298         rte_free(port);
299
300         return 0;
301 }
302
303 static int rte_port_eventdev_writer_stats_read(void *port,
304         struct rte_port_out_stats *stats, int clear)
305 {
306         struct rte_port_eventdev_writer *p =
307                         port;
308
309         if (stats != NULL)
310                 memcpy(stats, &p->stats, sizeof(p->stats));
311
312         if (clear)
313                 memset(&p->stats, 0, sizeof(p->stats));
314
315         return 0;
316 }
317
318 /*
319  * Port EVENTDEV Writer Nodrop
320  */
321 #ifdef RTE_PORT_STATS_COLLECT
322
323 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
324         do {port->stats.n_pkts_in += val;} while (0)
325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
326         do {port->stats.n_pkts_drop += val;} while (0)
327
328 #else
329
330 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
331 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
332
333 #endif
334
335 struct rte_port_eventdev_writer_nodrop {
336         struct rte_port_out_stats stats;
337
338         struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
339
340         uint32_t enq_burst_sz;
341         uint32_t enq_buf_count;
342         uint64_t bsz_mask;
343         uint64_t n_retries;
344         uint8_t eventdev_id;
345         uint8_t port_id;
346         uint8_t queue_id;
347         uint8_t sched_type;
348         uint8_t evt_op;
349 };
350
351
352 static void *
353 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id)
354 {
355         struct rte_port_eventdev_writer_nodrop_params *conf =
356                         params;
357         struct rte_port_eventdev_writer_nodrop *port;
358         unsigned int i;
359
360         /* Check input parameters */
361         if ((conf == NULL) ||
362                 (conf->enq_burst_sz == 0) ||
363                 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
364                 (!rte_is_power_of_2(conf->enq_burst_sz))) {
365                 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
366                 return NULL;
367         }
368
369         /* Memory allocation */
370         port = rte_zmalloc_socket("PORT", sizeof(*port),
371                 RTE_CACHE_LINE_SIZE, socket_id);
372         if (port == NULL) {
373                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
374                 return NULL;
375         }
376
377         /* Initialization */
378         port->enq_burst_sz = conf->enq_burst_sz;
379         port->enq_buf_count = 0;
380         port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
381
382         port->eventdev_id = conf->eventdev_id;
383         port->port_id = conf->port_id;
384         port->queue_id = conf->queue_id;
385         port->sched_type = conf->sched_type;
386         port->evt_op = conf->evt_op;
387         memset(&port->ev, 0, sizeof(port->ev));
388
389         for (i = 0; i < RTE_DIM(port->ev); i++) {
390                 port->ev[i].queue_id = port->queue_id;
391                 port->ev[i].sched_type = port->sched_type;
392                 port->ev[i].op = port->evt_op;
393         }
394         /*
395          * When n_retries is 0 it means that we should wait for every event to
396          * send no matter how many retries should it take. To limit number of
397          * branches in fast path, we use UINT64_MAX instead of branching.
398          */
399         port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
400
401         return port;
402 }
403
404 static inline void
405 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p)
406 {
407         uint32_t nb_enq, i;
408
409         nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
410                         p->ev, p->enq_buf_count);
411
412         /* We sent all the packets in a first try */
413         if (nb_enq >= p->enq_buf_count) {
414                 p->enq_buf_count = 0;
415                 return;
416         }
417
418         for (i = 0; i < p->n_retries; i++) {
419                 nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id,
420                                                         p->ev + nb_enq,
421                                                         p->enq_buf_count - nb_enq);
422
423                 /* We sent all the events in more than one try */
424                 if (nb_enq >= p->enq_buf_count) {
425                         p->enq_buf_count = 0;
426                         return;
427                 }
428         }
429         /* We didn't send the events in maximum allowed attempts */
430         RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p,
431                         p->enq_buf_count - nb_enq);
432         for (; nb_enq < p->enq_buf_count; nb_enq++)
433                 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
434
435         p->enq_buf_count = 0;
436 }
437
438 static int
439 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
440 {
441         struct rte_port_eventdev_writer_nodrop *p = port;
442
443         p->ev[p->enq_buf_count++].mbuf = pkt;
444
445         RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
446         if (p->enq_buf_count >= p->enq_burst_sz)
447                 send_burst_nodrop(p);
448
449         return 0;
450 }
451
452 static int
453 rte_port_eventdev_writer_nodrop_tx_bulk(void *port,
454         struct rte_mbuf **pkts,
455         uint64_t pkts_mask)
456 {
457         struct rte_port_eventdev_writer_nodrop *p =
458                         port;
459
460         uint64_t bsz_mask = p->bsz_mask;
461         uint32_t enq_buf_count = p->enq_buf_count;
462         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
463                                         ((pkts_mask & bsz_mask) ^ bsz_mask);
464
465         if (expr == 0) {
466                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
467                 uint32_t i, n_enq_ok;
468
469                 if (enq_buf_count)
470                         send_burst_nodrop(p);
471
472                 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
473
474                 struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {};
475
476                 for (i = 0; i < n_pkts; i++) {
477                         events[i].mbuf = pkts[i];
478                         events[i].queue_id = p->queue_id;
479                         events[i].sched_type = p->sched_type;
480                         events[i].op = p->evt_op;
481                 }
482
483                 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
484                                 events, n_pkts);
485
486                 if (n_enq_ok >= n_pkts)
487                         return 0;
488
489                 /*
490                  * If we did not manage to enqueue all events in single burst,
491                  * move remaining events to the buffer and call send burst.
492                  */
493                 for (; n_enq_ok < n_pkts; n_enq_ok++) {
494                         struct rte_mbuf *pkt = pkts[n_enq_ok];
495                         p->ev[p->enq_buf_count++].mbuf = pkt;
496                 }
497                 send_burst_nodrop(p);
498         } else {
499                 for (; pkts_mask;) {
500                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
501                         uint64_t pkt_mask = 1LLU << pkt_index;
502
503                         p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
504
505                         RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
506                         pkts_mask &= ~pkt_mask;
507                 }
508
509                 p->enq_buf_count = enq_buf_count;
510                 if (enq_buf_count >= p->enq_burst_sz)
511                         send_burst_nodrop(p);
512         }
513
514         return 0;
515 }
516
517 static int
518 rte_port_eventdev_writer_nodrop_flush(void *port)
519 {
520         struct rte_port_eventdev_writer_nodrop *p =
521                         port;
522
523         if (p->enq_buf_count > 0)
524                 send_burst_nodrop(p);
525
526         return 0;
527 }
528
529 static int
530 rte_port_eventdev_writer_nodrop_free(void *port)
531 {
532         if (port == NULL) {
533                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
534                 return -EINVAL;
535         }
536
537         rte_port_eventdev_writer_nodrop_flush(port);
538         rte_free(port);
539
540         return 0;
541 }
542
543 static int rte_port_eventdev_writer_nodrop_stats_read(void *port,
544         struct rte_port_out_stats *stats, int clear)
545 {
546         struct rte_port_eventdev_writer_nodrop *p =
547                         port;
548
549         if (stats != NULL)
550                 memcpy(stats, &p->stats, sizeof(p->stats));
551
552         if (clear)
553                 memset(&p->stats, 0, sizeof(p->stats));
554
555         return 0;
556 }
557
558 /*
559  * Summary of port operations
560  */
561 struct rte_port_in_ops rte_port_eventdev_reader_ops = {
562         .f_create = rte_port_eventdev_reader_create,
563         .f_free = rte_port_eventdev_reader_free,
564         .f_rx = rte_port_eventdev_reader_rx,
565         .f_stats = rte_port_eventdev_reader_stats_read,
566 };
567
568 struct rte_port_out_ops rte_port_eventdev_writer_ops = {
569         .f_create = rte_port_eventdev_writer_create,
570         .f_free = rte_port_eventdev_writer_free,
571         .f_tx = rte_port_eventdev_writer_tx,
572         .f_tx_bulk = rte_port_eventdev_writer_tx_bulk,
573         .f_flush = rte_port_eventdev_writer_flush,
574         .f_stats = rte_port_eventdev_writer_stats_read,
575 };
576
577 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = {
578         .f_create = rte_port_eventdev_writer_nodrop_create,
579         .f_free = rte_port_eventdev_writer_nodrop_free,
580         .f_tx = rte_port_eventdev_writer_nodrop_tx,
581         .f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk,
582         .f_flush = rte_port_eventdev_writer_nodrop_flush,
583         .f_stats = rte_port_eventdev_writer_nodrop_stats_read,
584 };