4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 #include <rte_malloc.h>
40 #include "rte_port_ring.h"
45 #ifdef RTE_PORT_STATS_COLLECT
47 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48 port->stats.n_pkts_in += val
49 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50 port->stats.n_pkts_drop += val
54 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
59 struct rte_port_ring_reader {
60 struct rte_port_in_stats stats;
62 struct rte_ring *ring;
66 rte_port_ring_reader_create_internal(void *params, int socket_id,
69 struct rte_port_ring_reader_params *conf =
70 (struct rte_port_ring_reader_params *) params;
71 struct rte_port_ring_reader *port;
73 /* Check input parameters */
75 (conf->ring == NULL) ||
76 (conf->ring->cons.sc_dequeue && is_multi) ||
77 (!(conf->ring->cons.sc_dequeue) && !is_multi)) {
78 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
82 /* Memory allocation */
83 port = rte_zmalloc_socket("PORT", sizeof(*port),
84 RTE_CACHE_LINE_SIZE, socket_id);
86 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
91 port->ring = conf->ring;
97 rte_port_ring_reader_create(void *params, int socket_id)
99 return rte_port_ring_reader_create_internal(params, socket_id, 0);
103 rte_port_ring_multi_reader_create(void *params, int socket_id)
105 return rte_port_ring_reader_create_internal(params, socket_id, 1);
109 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
111 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
114 nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
115 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
121 rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
124 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
127 nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
128 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
134 rte_port_ring_reader_free(void *port)
137 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
147 rte_port_ring_reader_stats_read(void *port,
148 struct rte_port_in_stats *stats, int clear)
150 struct rte_port_ring_reader *p =
151 (struct rte_port_ring_reader *) port;
154 memcpy(stats, &p->stats, sizeof(p->stats));
157 memset(&p->stats, 0, sizeof(p->stats));
165 #ifdef RTE_PORT_STATS_COLLECT
167 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
168 port->stats.n_pkts_in += val
169 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
170 port->stats.n_pkts_drop += val
174 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
175 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
179 struct rte_port_ring_writer {
180 struct rte_port_out_stats stats;
182 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
183 struct rte_ring *ring;
184 uint32_t tx_burst_sz;
185 uint32_t tx_buf_count;
191 rte_port_ring_writer_create_internal(void *params, int socket_id,
194 struct rte_port_ring_writer_params *conf =
195 (struct rte_port_ring_writer_params *) params;
196 struct rte_port_ring_writer *port;
198 /* Check input parameters */
199 if ((conf == NULL) ||
200 (conf->ring == NULL) ||
201 (conf->ring->prod.sp_enqueue && is_multi) ||
202 (!(conf->ring->prod.sp_enqueue) && !is_multi) ||
203 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
204 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
208 /* Memory allocation */
209 port = rte_zmalloc_socket("PORT", sizeof(*port),
210 RTE_CACHE_LINE_SIZE, socket_id);
212 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
217 port->ring = conf->ring;
218 port->tx_burst_sz = conf->tx_burst_sz;
219 port->tx_buf_count = 0;
220 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
221 port->is_multi = is_multi;
227 rte_port_ring_writer_create(void *params, int socket_id)
229 return rte_port_ring_writer_create_internal(params, socket_id, 0);
233 rte_port_ring_multi_writer_create(void *params, int socket_id)
235 return rte_port_ring_writer_create_internal(params, socket_id, 1);
239 send_burst(struct rte_port_ring_writer *p)
243 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
246 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
247 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
248 rte_pktmbuf_free(p->tx_buf[nb_tx]);
254 send_burst_mp(struct rte_port_ring_writer *p)
258 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
261 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
262 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
263 rte_pktmbuf_free(p->tx_buf[nb_tx]);
269 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
271 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
273 p->tx_buf[p->tx_buf_count++] = pkt;
274 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
275 if (p->tx_buf_count >= p->tx_burst_sz)
282 rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
284 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
286 p->tx_buf[p->tx_buf_count++] = pkt;
287 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
288 if (p->tx_buf_count >= p->tx_burst_sz)
294 static inline int __attribute__((always_inline))
295 rte_port_ring_writer_tx_bulk_internal(void *port,
296 struct rte_mbuf **pkts,
300 struct rte_port_ring_writer *p =
301 (struct rte_port_ring_writer *) port;
303 uint64_t bsz_mask = p->bsz_mask;
304 uint32_t tx_buf_count = p->tx_buf_count;
305 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
306 ((pkts_mask & bsz_mask) ^ bsz_mask);
309 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
319 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
321 n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
324 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
327 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
328 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
329 struct rte_mbuf *pkt = pkts[n_pkts_ok];
331 rte_pktmbuf_free(pkt);
334 for ( ; pkts_mask; ) {
335 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
336 uint64_t pkt_mask = 1LLU << pkt_index;
337 struct rte_mbuf *pkt = pkts[pkt_index];
339 p->tx_buf[tx_buf_count++] = pkt;
340 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
341 pkts_mask &= ~pkt_mask;
344 p->tx_buf_count = tx_buf_count;
345 if (tx_buf_count >= p->tx_burst_sz) {
357 rte_port_ring_writer_tx_bulk(void *port,
358 struct rte_mbuf **pkts,
361 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
365 rte_port_ring_multi_writer_tx_bulk(void *port,
366 struct rte_mbuf **pkts,
369 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
373 rte_port_ring_writer_flush(void *port)
375 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
377 if (p->tx_buf_count > 0)
384 rte_port_ring_multi_writer_flush(void *port)
386 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
388 if (p->tx_buf_count > 0)
395 rte_port_ring_writer_free(void *port)
397 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
400 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
405 rte_port_ring_multi_writer_flush(port);
407 rte_port_ring_writer_flush(port);
415 rte_port_ring_writer_stats_read(void *port,
416 struct rte_port_out_stats *stats, int clear)
418 struct rte_port_ring_writer *p =
419 (struct rte_port_ring_writer *) port;
422 memcpy(stats, &p->stats, sizeof(p->stats));
425 memset(&p->stats, 0, sizeof(p->stats));
431 * Port RING Writer Nodrop
433 #ifdef RTE_PORT_STATS_COLLECT
435 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
436 port->stats.n_pkts_in += val
437 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
438 port->stats.n_pkts_drop += val
442 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
443 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
447 struct rte_port_ring_writer_nodrop {
448 struct rte_port_out_stats stats;
450 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
451 struct rte_ring *ring;
452 uint32_t tx_burst_sz;
453 uint32_t tx_buf_count;
460 rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
463 struct rte_port_ring_writer_nodrop_params *conf =
464 (struct rte_port_ring_writer_nodrop_params *) params;
465 struct rte_port_ring_writer_nodrop *port;
467 /* Check input parameters */
468 if ((conf == NULL) ||
469 (conf->ring == NULL) ||
470 (conf->ring->prod.sp_enqueue && is_multi) ||
471 (!(conf->ring->prod.sp_enqueue) && !is_multi) ||
472 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
473 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
477 /* Memory allocation */
478 port = rte_zmalloc_socket("PORT", sizeof(*port),
479 RTE_CACHE_LINE_SIZE, socket_id);
481 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
486 port->ring = conf->ring;
487 port->tx_burst_sz = conf->tx_burst_sz;
488 port->tx_buf_count = 0;
489 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
490 port->is_multi = is_multi;
493 * When n_retries is 0 it means that we should wait for every packet to
494 * send no matter how many retries should it take. To limit number of
495 * branches in fast path, we use UINT64_MAX instead of branching.
497 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
503 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
505 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
509 rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
511 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
515 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
517 uint32_t nb_tx = 0, i;
519 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
522 /* We sent all the packets in a first try */
523 if (nb_tx >= p->tx_buf_count) {
528 for (i = 0; i < p->n_retries; i++) {
529 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
530 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
532 /* We sent all the packets in more than one try */
533 if (nb_tx >= p->tx_buf_count) {
539 /* We didn't send the packets in maximum allowed attempts */
540 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
541 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
542 rte_pktmbuf_free(p->tx_buf[nb_tx]);
548 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
550 uint32_t nb_tx = 0, i;
552 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
555 /* We sent all the packets in a first try */
556 if (nb_tx >= p->tx_buf_count) {
561 for (i = 0; i < p->n_retries; i++) {
562 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
563 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
565 /* We sent all the packets in more than one try */
566 if (nb_tx >= p->tx_buf_count) {
572 /* We didn't send the packets in maximum allowed attempts */
573 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
574 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
575 rte_pktmbuf_free(p->tx_buf[nb_tx]);
581 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
583 struct rte_port_ring_writer_nodrop *p =
584 (struct rte_port_ring_writer_nodrop *) port;
586 p->tx_buf[p->tx_buf_count++] = pkt;
587 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
588 if (p->tx_buf_count >= p->tx_burst_sz)
589 send_burst_nodrop(p);
595 rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
597 struct rte_port_ring_writer_nodrop *p =
598 (struct rte_port_ring_writer_nodrop *) port;
600 p->tx_buf[p->tx_buf_count++] = pkt;
601 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
602 if (p->tx_buf_count >= p->tx_burst_sz)
603 send_burst_mp_nodrop(p);
608 static inline int __attribute__((always_inline))
609 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
610 struct rte_mbuf **pkts,
614 struct rte_port_ring_writer_nodrop *p =
615 (struct rte_port_ring_writer_nodrop *) port;
617 uint64_t bsz_mask = p->bsz_mask;
618 uint32_t tx_buf_count = p->tx_buf_count;
619 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
620 ((pkts_mask & bsz_mask) ^ bsz_mask);
623 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
628 send_burst_mp_nodrop(p);
630 send_burst_nodrop(p);
633 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
636 rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
639 rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
641 if (n_pkts_ok >= n_pkts)
645 * If we didn't manage to send all packets in single burst, move
646 * remaining packets to the buffer and call send burst.
648 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
649 struct rte_mbuf *pkt = pkts[n_pkts_ok];
651 p->tx_buf[p->tx_buf_count++] = pkt;
654 send_burst_mp_nodrop(p);
656 send_burst_nodrop(p);
658 for ( ; pkts_mask; ) {
659 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
660 uint64_t pkt_mask = 1LLU << pkt_index;
661 struct rte_mbuf *pkt = pkts[pkt_index];
663 p->tx_buf[tx_buf_count++] = pkt;
664 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
665 pkts_mask &= ~pkt_mask;
668 p->tx_buf_count = tx_buf_count;
669 if (tx_buf_count >= p->tx_burst_sz) {
671 send_burst_mp_nodrop(p);
673 send_burst_nodrop(p);
681 rte_port_ring_writer_nodrop_tx_bulk(void *port,
682 struct rte_mbuf **pkts,
686 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
690 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
691 struct rte_mbuf **pkts,
695 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
699 rte_port_ring_writer_nodrop_flush(void *port)
701 struct rte_port_ring_writer_nodrop *p =
702 (struct rte_port_ring_writer_nodrop *) port;
704 if (p->tx_buf_count > 0)
705 send_burst_nodrop(p);
711 rte_port_ring_multi_writer_nodrop_flush(void *port)
713 struct rte_port_ring_writer_nodrop *p =
714 (struct rte_port_ring_writer_nodrop *) port;
716 if (p->tx_buf_count > 0)
717 send_burst_mp_nodrop(p);
723 rte_port_ring_writer_nodrop_free(void *port)
725 struct rte_port_ring_writer_nodrop *p =
726 (struct rte_port_ring_writer_nodrop *) port;
729 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
734 rte_port_ring_multi_writer_nodrop_flush(port);
736 rte_port_ring_writer_nodrop_flush(port);
744 rte_port_ring_writer_nodrop_stats_read(void *port,
745 struct rte_port_out_stats *stats, int clear)
747 struct rte_port_ring_writer_nodrop *p =
748 (struct rte_port_ring_writer_nodrop *) port;
751 memcpy(stats, &p->stats, sizeof(p->stats));
754 memset(&p->stats, 0, sizeof(p->stats));
760 * Summary of port operations
762 struct rte_port_in_ops rte_port_ring_reader_ops = {
763 .f_create = rte_port_ring_reader_create,
764 .f_free = rte_port_ring_reader_free,
765 .f_rx = rte_port_ring_reader_rx,
766 .f_stats = rte_port_ring_reader_stats_read,
769 struct rte_port_out_ops rte_port_ring_writer_ops = {
770 .f_create = rte_port_ring_writer_create,
771 .f_free = rte_port_ring_writer_free,
772 .f_tx = rte_port_ring_writer_tx,
773 .f_tx_bulk = rte_port_ring_writer_tx_bulk,
774 .f_flush = rte_port_ring_writer_flush,
775 .f_stats = rte_port_ring_writer_stats_read,
778 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
779 .f_create = rte_port_ring_writer_nodrop_create,
780 .f_free = rte_port_ring_writer_nodrop_free,
781 .f_tx = rte_port_ring_writer_nodrop_tx,
782 .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
783 .f_flush = rte_port_ring_writer_nodrop_flush,
784 .f_stats = rte_port_ring_writer_nodrop_stats_read,
787 struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
788 .f_create = rte_port_ring_multi_reader_create,
789 .f_free = rte_port_ring_reader_free,
790 .f_rx = rte_port_ring_multi_reader_rx,
791 .f_stats = rte_port_ring_reader_stats_read,
794 struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
795 .f_create = rte_port_ring_multi_writer_create,
796 .f_free = rte_port_ring_writer_free,
797 .f_tx = rte_port_ring_multi_writer_tx,
798 .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
799 .f_flush = rte_port_ring_multi_writer_flush,
800 .f_stats = rte_port_ring_writer_stats_read,
803 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
804 .f_create = rte_port_ring_multi_writer_nodrop_create,
805 .f_free = rte_port_ring_writer_nodrop_free,
806 .f_tx = rte_port_ring_multi_writer_nodrop_tx,
807 .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
808 .f_flush = rte_port_ring_multi_writer_nodrop_flush,
809 .f_stats = rte_port_ring_writer_nodrop_stats_read,