1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2019 Intel Corporation
9 #include <rte_malloc.h>
11 #include "rte_port_eventdev.h"
14 * Port EVENTDEV Reader
16 #ifdef RTE_PORT_STATS_COLLECT
18 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \
19 do {port->stats.n_pkts_in += val;} while (0)
20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \
21 do {port->stats.n_pkts_drop += val;} while (0)
25 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val)
26 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val)
30 struct rte_port_eventdev_reader {
31 struct rte_port_in_stats stats;
36 struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX];
40 rte_port_eventdev_reader_create(void *params, int socket_id)
42 struct rte_port_eventdev_reader_params *conf =
44 struct rte_port_eventdev_reader *port;
46 /* Check input parameters */
48 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
52 /* Memory allocation */
53 port = rte_zmalloc_socket("PORT", sizeof(*port),
54 RTE_CACHE_LINE_SIZE, socket_id);
56 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
61 port->eventdev_id = conf->eventdev_id;
62 port->port_id = conf->port_id;
68 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
70 struct rte_port_eventdev_reader *p = port;
71 uint16_t rx_evts_cnt, i;
73 rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id,
76 for (i = 0; i < rx_evts_cnt; i++)
77 pkts[i] = p->ev[i].mbuf;
79 RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt);
85 rte_port_eventdev_reader_free(void *port)
88 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
97 static int rte_port_eventdev_reader_stats_read(void *port,
98 struct rte_port_in_stats *stats, int clear)
100 struct rte_port_eventdev_reader *p =
104 memcpy(stats, &p->stats, sizeof(p->stats));
107 memset(&p->stats, 0, sizeof(p->stats));
113 * Port EVENTDEV Writer
115 #ifdef RTE_PORT_STATS_COLLECT
117 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \
118 do {port->stats.n_pkts_in += val;} while (0)
119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \
120 do {port->stats.n_pkts_drop += val;} while (0)
124 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val)
125 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val)
129 struct rte_port_eventdev_writer {
130 struct rte_port_out_stats stats;
132 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
134 uint32_t enq_burst_sz;
135 uint32_t enq_buf_count;
146 rte_port_eventdev_writer_create(void *params, int socket_id)
148 struct rte_port_eventdev_writer_params *conf =
150 struct rte_port_eventdev_writer *port;
153 /* Check input parameters */
154 if ((conf == NULL) ||
155 (conf->enq_burst_sz == 0) ||
156 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
157 (!rte_is_power_of_2(conf->enq_burst_sz))) {
158 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
162 /* Memory allocation */
163 port = rte_zmalloc_socket("PORT", sizeof(*port),
164 RTE_CACHE_LINE_SIZE, socket_id);
166 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
171 port->enq_burst_sz = conf->enq_burst_sz;
172 port->enq_buf_count = 0;
173 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
175 port->eventdev_id = conf->eventdev_id;
176 port->port_id = conf->port_id;
177 port->queue_id = conf->queue_id;
178 port->sched_type = conf->sched_type;
179 port->evt_op = conf->evt_op;
180 memset(&port->ev, 0, sizeof(port->ev));
182 for (i = 0; i < RTE_DIM(port->ev); i++) {
183 port->ev[i].queue_id = port->queue_id;
184 port->ev[i].sched_type = port->sched_type;
185 port->ev[i].op = port->evt_op;
192 send_burst(struct rte_port_eventdev_writer *p)
196 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
197 p->ev, p->enq_buf_count);
199 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count -
202 for (; nb_enq < p->enq_buf_count; nb_enq++)
203 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
205 p->enq_buf_count = 0;
209 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt)
211 struct rte_port_eventdev_writer *p = port;
213 p->ev[p->enq_buf_count++].mbuf = pkt;
214 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
215 if (p->enq_buf_count >= p->enq_burst_sz)
222 rte_port_eventdev_writer_tx_bulk(void *port,
223 struct rte_mbuf **pkts,
226 struct rte_port_eventdev_writer *p =
228 uint64_t bsz_mask = p->bsz_mask;
229 uint32_t enq_buf_count = p->enq_buf_count;
230 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
231 ((pkts_mask & bsz_mask) ^ bsz_mask);
234 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
235 uint32_t i, n_enq_ok;
240 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
242 struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {};
243 for (i = 0; i < n_pkts; i++) {
244 events[i].mbuf = pkts[i];
245 events[i].queue_id = p->queue_id;
246 events[i].sched_type = p->sched_type;
247 events[i].op = p->evt_op;
250 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
253 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p,
255 for (; n_enq_ok < n_pkts; n_enq_ok++)
256 rte_pktmbuf_free(pkts[n_enq_ok]);
260 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
261 uint64_t pkt_mask = 1LLU << pkt_index;
263 p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
265 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
266 pkts_mask &= ~pkt_mask;
269 p->enq_buf_count = enq_buf_count;
270 if (enq_buf_count >= p->enq_burst_sz)
278 rte_port_eventdev_writer_flush(void *port)
280 struct rte_port_eventdev_writer *p =
283 if (p->enq_buf_count > 0)
290 rte_port_eventdev_writer_free(void *port)
293 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
297 rte_port_eventdev_writer_flush(port);
303 static int rte_port_eventdev_writer_stats_read(void *port,
304 struct rte_port_out_stats *stats, int clear)
306 struct rte_port_eventdev_writer *p =
310 memcpy(stats, &p->stats, sizeof(p->stats));
313 memset(&p->stats, 0, sizeof(p->stats));
319 * Port EVENTDEV Writer Nodrop
321 #ifdef RTE_PORT_STATS_COLLECT
323 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
324 do {port->stats.n_pkts_in += val;} while (0)
325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
326 do {port->stats.n_pkts_drop += val;} while (0)
330 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
331 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
335 struct rte_port_eventdev_writer_nodrop {
336 struct rte_port_out_stats stats;
338 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX];
340 uint32_t enq_burst_sz;
341 uint32_t enq_buf_count;
353 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id)
355 struct rte_port_eventdev_writer_nodrop_params *conf =
357 struct rte_port_eventdev_writer_nodrop *port;
360 /* Check input parameters */
361 if ((conf == NULL) ||
362 (conf->enq_burst_sz == 0) ||
363 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) ||
364 (!rte_is_power_of_2(conf->enq_burst_sz))) {
365 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__);
369 /* Memory allocation */
370 port = rte_zmalloc_socket("PORT", sizeof(*port),
371 RTE_CACHE_LINE_SIZE, socket_id);
373 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
378 port->enq_burst_sz = conf->enq_burst_sz;
379 port->enq_buf_count = 0;
380 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1);
382 port->eventdev_id = conf->eventdev_id;
383 port->port_id = conf->port_id;
384 port->queue_id = conf->queue_id;
385 port->sched_type = conf->sched_type;
386 port->evt_op = conf->evt_op;
387 memset(&port->ev, 0, sizeof(port->ev));
389 for (i = 0; i < RTE_DIM(port->ev); i++) {
390 port->ev[i].queue_id = port->queue_id;
391 port->ev[i].sched_type = port->sched_type;
392 port->ev[i].op = port->evt_op;
395 * When n_retries is 0 it means that we should wait for every event to
396 * send no matter how many retries should it take. To limit number of
397 * branches in fast path, we use UINT64_MAX instead of branching.
399 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
405 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p)
409 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
410 p->ev, p->enq_buf_count);
412 /* We sent all the packets in a first try */
413 if (nb_enq >= p->enq_buf_count) {
414 p->enq_buf_count = 0;
418 for (i = 0; i < p->n_retries; i++) {
419 nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id,
421 p->enq_buf_count - nb_enq);
423 /* We sent all the events in more than one try */
424 if (nb_enq >= p->enq_buf_count) {
425 p->enq_buf_count = 0;
429 /* We didn't send the events in maximum allowed attempts */
430 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p,
431 p->enq_buf_count - nb_enq);
432 for (; nb_enq < p->enq_buf_count; nb_enq++)
433 rte_pktmbuf_free(p->ev[nb_enq].mbuf);
435 p->enq_buf_count = 0;
439 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
441 struct rte_port_eventdev_writer_nodrop *p = port;
443 p->ev[p->enq_buf_count++].mbuf = pkt;
445 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
446 if (p->enq_buf_count >= p->enq_burst_sz)
447 send_burst_nodrop(p);
453 rte_port_eventdev_writer_nodrop_tx_bulk(void *port,
454 struct rte_mbuf **pkts,
457 struct rte_port_eventdev_writer_nodrop *p =
460 uint64_t bsz_mask = p->bsz_mask;
461 uint32_t enq_buf_count = p->enq_buf_count;
462 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
463 ((pkts_mask & bsz_mask) ^ bsz_mask);
466 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
467 uint32_t i, n_enq_ok;
470 send_burst_nodrop(p);
472 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
474 struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {};
476 for (i = 0; i < n_pkts; i++) {
477 events[i].mbuf = pkts[i];
478 events[i].queue_id = p->queue_id;
479 events[i].sched_type = p->sched_type;
480 events[i].op = p->evt_op;
483 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id,
486 if (n_enq_ok >= n_pkts)
490 * If we did not manage to enqueue all events in single burst,
491 * move remaining events to the buffer and call send burst.
493 for (; n_enq_ok < n_pkts; n_enq_ok++) {
494 struct rte_mbuf *pkt = pkts[n_enq_ok];
495 p->ev[p->enq_buf_count++].mbuf = pkt;
497 send_burst_nodrop(p);
500 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
501 uint64_t pkt_mask = 1LLU << pkt_index;
503 p->ev[enq_buf_count++].mbuf = pkts[pkt_index];
505 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1);
506 pkts_mask &= ~pkt_mask;
509 p->enq_buf_count = enq_buf_count;
510 if (enq_buf_count >= p->enq_burst_sz)
511 send_burst_nodrop(p);
518 rte_port_eventdev_writer_nodrop_flush(void *port)
520 struct rte_port_eventdev_writer_nodrop *p =
523 if (p->enq_buf_count > 0)
524 send_burst_nodrop(p);
530 rte_port_eventdev_writer_nodrop_free(void *port)
533 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
537 rte_port_eventdev_writer_nodrop_flush(port);
543 static int rte_port_eventdev_writer_nodrop_stats_read(void *port,
544 struct rte_port_out_stats *stats, int clear)
546 struct rte_port_eventdev_writer_nodrop *p =
550 memcpy(stats, &p->stats, sizeof(p->stats));
553 memset(&p->stats, 0, sizeof(p->stats));
559 * Summary of port operations
561 struct rte_port_in_ops rte_port_eventdev_reader_ops = {
562 .f_create = rte_port_eventdev_reader_create,
563 .f_free = rte_port_eventdev_reader_free,
564 .f_rx = rte_port_eventdev_reader_rx,
565 .f_stats = rte_port_eventdev_reader_stats_read,
568 struct rte_port_out_ops rte_port_eventdev_writer_ops = {
569 .f_create = rte_port_eventdev_writer_create,
570 .f_free = rte_port_eventdev_writer_free,
571 .f_tx = rte_port_eventdev_writer_tx,
572 .f_tx_bulk = rte_port_eventdev_writer_tx_bulk,
573 .f_flush = rte_port_eventdev_writer_flush,
574 .f_stats = rte_port_eventdev_writer_stats_read,
577 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = {
578 .f_create = rte_port_eventdev_writer_nodrop_create,
579 .f_free = rte_port_eventdev_writer_nodrop_free,
580 .f_tx = rte_port_eventdev_writer_nodrop_tx,
581 .f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk,
582 .f_flush = rte_port_eventdev_writer_nodrop_flush,
583 .f_stats = rte_port_eventdev_writer_nodrop_stats_read,