4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 #include <rte_malloc.h>
40 #include "rte_port_ring.h"
45 #ifdef RTE_PORT_STATS_COLLECT
47 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48 port->stats.n_pkts_in += val
49 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50 port->stats.n_pkts_drop += val
54 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
59 struct rte_port_ring_reader {
60 struct rte_port_in_stats stats;
62 struct rte_ring *ring;
66 rte_port_ring_reader_create_internal(void *params, int socket_id,
69 struct rte_port_ring_reader_params *conf =
71 struct rte_port_ring_reader *port;
73 /* Check input parameters */
75 (conf->ring == NULL) ||
76 (conf->ring->cons.single && is_multi) ||
77 (!(conf->ring->cons.single) && !is_multi)) {
78 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
82 /* Memory allocation */
83 port = rte_zmalloc_socket("PORT", sizeof(*port),
84 RTE_CACHE_LINE_SIZE, socket_id);
86 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
91 port->ring = conf->ring;
97 rte_port_ring_reader_create(void *params, int socket_id)
99 return rte_port_ring_reader_create_internal(params, socket_id, 0);
103 rte_port_ring_multi_reader_create(void *params, int socket_id)
105 return rte_port_ring_reader_create_internal(params, socket_id, 1);
109 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
111 struct rte_port_ring_reader *p = port;
114 nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts,
116 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
122 rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
125 struct rte_port_ring_reader *p = port;
128 nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts,
130 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
136 rte_port_ring_reader_free(void *port)
139 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
149 rte_port_ring_reader_stats_read(void *port,
150 struct rte_port_in_stats *stats, int clear)
152 struct rte_port_ring_reader *p =
156 memcpy(stats, &p->stats, sizeof(p->stats));
159 memset(&p->stats, 0, sizeof(p->stats));
167 #ifdef RTE_PORT_STATS_COLLECT
169 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
170 port->stats.n_pkts_in += val
171 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
172 port->stats.n_pkts_drop += val
176 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
177 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
181 struct rte_port_ring_writer {
182 struct rte_port_out_stats stats;
184 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
185 struct rte_ring *ring;
186 uint32_t tx_burst_sz;
187 uint32_t tx_buf_count;
193 rte_port_ring_writer_create_internal(void *params, int socket_id,
196 struct rte_port_ring_writer_params *conf =
198 struct rte_port_ring_writer *port;
200 /* Check input parameters */
201 if ((conf == NULL) ||
202 (conf->ring == NULL) ||
203 (conf->ring->prod.single && is_multi) ||
204 (!(conf->ring->prod.single) && !is_multi) ||
205 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
206 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
210 /* Memory allocation */
211 port = rte_zmalloc_socket("PORT", sizeof(*port),
212 RTE_CACHE_LINE_SIZE, socket_id);
214 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
219 port->ring = conf->ring;
220 port->tx_burst_sz = conf->tx_burst_sz;
221 port->tx_buf_count = 0;
222 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
223 port->is_multi = is_multi;
229 rte_port_ring_writer_create(void *params, int socket_id)
231 return rte_port_ring_writer_create_internal(params, socket_id, 0);
235 rte_port_ring_multi_writer_create(void *params, int socket_id)
237 return rte_port_ring_writer_create_internal(params, socket_id, 1);
241 send_burst(struct rte_port_ring_writer *p)
245 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
246 p->tx_buf_count, NULL);
248 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
249 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
250 rte_pktmbuf_free(p->tx_buf[nb_tx]);
256 send_burst_mp(struct rte_port_ring_writer *p)
260 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
261 p->tx_buf_count, NULL);
263 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
264 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
265 rte_pktmbuf_free(p->tx_buf[nb_tx]);
271 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
273 struct rte_port_ring_writer *p = port;
275 p->tx_buf[p->tx_buf_count++] = pkt;
276 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
277 if (p->tx_buf_count >= p->tx_burst_sz)
284 rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
286 struct rte_port_ring_writer *p = port;
288 p->tx_buf[p->tx_buf_count++] = pkt;
289 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
290 if (p->tx_buf_count >= p->tx_burst_sz)
296 static __rte_always_inline int
297 rte_port_ring_writer_tx_bulk_internal(void *port,
298 struct rte_mbuf **pkts,
302 struct rte_port_ring_writer *p =
305 uint64_t bsz_mask = p->bsz_mask;
306 uint32_t tx_buf_count = p->tx_buf_count;
307 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
308 ((pkts_mask & bsz_mask) ^ bsz_mask);
311 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
321 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
323 n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
324 (void **)pkts, n_pkts, NULL);
326 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
327 (void **)pkts, n_pkts, NULL);
329 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
330 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
331 struct rte_mbuf *pkt = pkts[n_pkts_ok];
333 rte_pktmbuf_free(pkt);
336 for ( ; pkts_mask; ) {
337 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
338 uint64_t pkt_mask = 1LLU << pkt_index;
339 struct rte_mbuf *pkt = pkts[pkt_index];
341 p->tx_buf[tx_buf_count++] = pkt;
342 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
343 pkts_mask &= ~pkt_mask;
346 p->tx_buf_count = tx_buf_count;
347 if (tx_buf_count >= p->tx_burst_sz) {
359 rte_port_ring_writer_tx_bulk(void *port,
360 struct rte_mbuf **pkts,
363 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
367 rte_port_ring_multi_writer_tx_bulk(void *port,
368 struct rte_mbuf **pkts,
371 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
375 rte_port_ring_writer_flush(void *port)
377 struct rte_port_ring_writer *p = port;
379 if (p->tx_buf_count > 0)
386 rte_port_ring_multi_writer_flush(void *port)
388 struct rte_port_ring_writer *p = port;
390 if (p->tx_buf_count > 0)
397 rte_port_ring_writer_free(void *port)
399 struct rte_port_ring_writer *p = port;
402 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
407 rte_port_ring_multi_writer_flush(port);
409 rte_port_ring_writer_flush(port);
417 rte_port_ring_writer_stats_read(void *port,
418 struct rte_port_out_stats *stats, int clear)
420 struct rte_port_ring_writer *p =
424 memcpy(stats, &p->stats, sizeof(p->stats));
427 memset(&p->stats, 0, sizeof(p->stats));
433 * Port RING Writer Nodrop
435 #ifdef RTE_PORT_STATS_COLLECT
437 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
438 port->stats.n_pkts_in += val
439 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
440 port->stats.n_pkts_drop += val
444 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
445 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
449 struct rte_port_ring_writer_nodrop {
450 struct rte_port_out_stats stats;
452 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
453 struct rte_ring *ring;
454 uint32_t tx_burst_sz;
455 uint32_t tx_buf_count;
462 rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
465 struct rte_port_ring_writer_nodrop_params *conf =
467 struct rte_port_ring_writer_nodrop *port;
469 /* Check input parameters */
470 if ((conf == NULL) ||
471 (conf->ring == NULL) ||
472 (conf->ring->prod.single && is_multi) ||
473 (!(conf->ring->prod.single) && !is_multi) ||
474 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
475 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
479 /* Memory allocation */
480 port = rte_zmalloc_socket("PORT", sizeof(*port),
481 RTE_CACHE_LINE_SIZE, socket_id);
483 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
488 port->ring = conf->ring;
489 port->tx_burst_sz = conf->tx_burst_sz;
490 port->tx_buf_count = 0;
491 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
492 port->is_multi = is_multi;
495 * When n_retries is 0 it means that we should wait for every packet to
496 * send no matter how many retries should it take. To limit number of
497 * branches in fast path, we use UINT64_MAX instead of branching.
499 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
505 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
507 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
511 rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
513 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
517 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
519 uint32_t nb_tx = 0, i;
521 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
522 p->tx_buf_count, NULL);
524 /* We sent all the packets in a first try */
525 if (nb_tx >= p->tx_buf_count) {
530 for (i = 0; i < p->n_retries; i++) {
531 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
532 (void **) (p->tx_buf + nb_tx),
533 p->tx_buf_count - nb_tx, NULL);
535 /* We sent all the packets in more than one try */
536 if (nb_tx >= p->tx_buf_count) {
542 /* We didn't send the packets in maximum allowed attempts */
543 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
544 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
545 rte_pktmbuf_free(p->tx_buf[nb_tx]);
551 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
553 uint32_t nb_tx = 0, i;
555 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
556 p->tx_buf_count, NULL);
558 /* We sent all the packets in a first try */
559 if (nb_tx >= p->tx_buf_count) {
564 for (i = 0; i < p->n_retries; i++) {
565 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
566 (void **) (p->tx_buf + nb_tx),
567 p->tx_buf_count - nb_tx, NULL);
569 /* We sent all the packets in more than one try */
570 if (nb_tx >= p->tx_buf_count) {
576 /* We didn't send the packets in maximum allowed attempts */
577 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
578 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
579 rte_pktmbuf_free(p->tx_buf[nb_tx]);
585 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
587 struct rte_port_ring_writer_nodrop *p =
590 p->tx_buf[p->tx_buf_count++] = pkt;
591 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
592 if (p->tx_buf_count >= p->tx_burst_sz)
593 send_burst_nodrop(p);
599 rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
601 struct rte_port_ring_writer_nodrop *p =
604 p->tx_buf[p->tx_buf_count++] = pkt;
605 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
606 if (p->tx_buf_count >= p->tx_burst_sz)
607 send_burst_mp_nodrop(p);
612 static __rte_always_inline int
613 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
614 struct rte_mbuf **pkts,
618 struct rte_port_ring_writer_nodrop *p =
621 uint64_t bsz_mask = p->bsz_mask;
622 uint32_t tx_buf_count = p->tx_buf_count;
623 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
624 ((pkts_mask & bsz_mask) ^ bsz_mask);
627 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
632 send_burst_mp_nodrop(p);
634 send_burst_nodrop(p);
637 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
640 rte_ring_mp_enqueue_burst(p->ring,
641 (void **)pkts, n_pkts, NULL);
644 rte_ring_sp_enqueue_burst(p->ring,
645 (void **)pkts, n_pkts, NULL);
647 if (n_pkts_ok >= n_pkts)
651 * If we didn't manage to send all packets in single burst, move
652 * remaining packets to the buffer and call send burst.
654 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
655 struct rte_mbuf *pkt = pkts[n_pkts_ok];
657 p->tx_buf[p->tx_buf_count++] = pkt;
660 send_burst_mp_nodrop(p);
662 send_burst_nodrop(p);
664 for ( ; pkts_mask; ) {
665 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
666 uint64_t pkt_mask = 1LLU << pkt_index;
667 struct rte_mbuf *pkt = pkts[pkt_index];
669 p->tx_buf[tx_buf_count++] = pkt;
670 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
671 pkts_mask &= ~pkt_mask;
674 p->tx_buf_count = tx_buf_count;
675 if (tx_buf_count >= p->tx_burst_sz) {
677 send_burst_mp_nodrop(p);
679 send_burst_nodrop(p);
687 rte_port_ring_writer_nodrop_tx_bulk(void *port,
688 struct rte_mbuf **pkts,
692 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
696 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
697 struct rte_mbuf **pkts,
701 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
705 rte_port_ring_writer_nodrop_flush(void *port)
707 struct rte_port_ring_writer_nodrop *p =
710 if (p->tx_buf_count > 0)
711 send_burst_nodrop(p);
717 rte_port_ring_multi_writer_nodrop_flush(void *port)
719 struct rte_port_ring_writer_nodrop *p =
722 if (p->tx_buf_count > 0)
723 send_burst_mp_nodrop(p);
729 rte_port_ring_writer_nodrop_free(void *port)
731 struct rte_port_ring_writer_nodrop *p =
735 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
740 rte_port_ring_multi_writer_nodrop_flush(port);
742 rte_port_ring_writer_nodrop_flush(port);
750 rte_port_ring_writer_nodrop_stats_read(void *port,
751 struct rte_port_out_stats *stats, int clear)
753 struct rte_port_ring_writer_nodrop *p =
757 memcpy(stats, &p->stats, sizeof(p->stats));
760 memset(&p->stats, 0, sizeof(p->stats));
766 * Summary of port operations
768 struct rte_port_in_ops rte_port_ring_reader_ops = {
769 .f_create = rte_port_ring_reader_create,
770 .f_free = rte_port_ring_reader_free,
771 .f_rx = rte_port_ring_reader_rx,
772 .f_stats = rte_port_ring_reader_stats_read,
775 struct rte_port_out_ops rte_port_ring_writer_ops = {
776 .f_create = rte_port_ring_writer_create,
777 .f_free = rte_port_ring_writer_free,
778 .f_tx = rte_port_ring_writer_tx,
779 .f_tx_bulk = rte_port_ring_writer_tx_bulk,
780 .f_flush = rte_port_ring_writer_flush,
781 .f_stats = rte_port_ring_writer_stats_read,
784 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
785 .f_create = rte_port_ring_writer_nodrop_create,
786 .f_free = rte_port_ring_writer_nodrop_free,
787 .f_tx = rte_port_ring_writer_nodrop_tx,
788 .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
789 .f_flush = rte_port_ring_writer_nodrop_flush,
790 .f_stats = rte_port_ring_writer_nodrop_stats_read,
793 struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
794 .f_create = rte_port_ring_multi_reader_create,
795 .f_free = rte_port_ring_reader_free,
796 .f_rx = rte_port_ring_multi_reader_rx,
797 .f_stats = rte_port_ring_reader_stats_read,
800 struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
801 .f_create = rte_port_ring_multi_writer_create,
802 .f_free = rte_port_ring_writer_free,
803 .f_tx = rte_port_ring_multi_writer_tx,
804 .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
805 .f_flush = rte_port_ring_multi_writer_flush,
806 .f_stats = rte_port_ring_writer_stats_read,
809 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
810 .f_create = rte_port_ring_multi_writer_nodrop_create,
811 .f_free = rte_port_ring_writer_nodrop_free,
812 .f_tx = rte_port_ring_multi_writer_nodrop_tx,
813 .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
814 .f_flush = rte_port_ring_multi_writer_nodrop_flush,
815 .f_stats = rte_port_ring_writer_nodrop_stats_read,