4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 #include <rte_malloc.h>
40 #include "rte_port_ring.h"
45 #ifdef RTE_PORT_STATS_COLLECT
47 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48 port->stats.n_pkts_in += val
49 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50 port->stats.n_pkts_drop += val
54 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
59 struct rte_port_ring_reader {
60 struct rte_port_in_stats stats;
62 struct rte_ring *ring;
66 rte_port_ring_reader_create_internal(void *params, int socket_id,
69 struct rte_port_ring_reader_params *conf =
70 (struct rte_port_ring_reader_params *) params;
71 struct rte_port_ring_reader *port;
73 /* Check input parameters */
75 (conf->ring == NULL) ||
76 (conf->ring->cons.single && is_multi) ||
77 (!(conf->ring->cons.single) && !is_multi)) {
78 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
82 /* Memory allocation */
83 port = rte_zmalloc_socket("PORT", sizeof(*port),
84 RTE_CACHE_LINE_SIZE, socket_id);
86 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
91 port->ring = conf->ring;
97 rte_port_ring_reader_create(void *params, int socket_id)
99 return rte_port_ring_reader_create_internal(params, socket_id, 0);
103 rte_port_ring_multi_reader_create(void *params, int socket_id)
105 return rte_port_ring_reader_create_internal(params, socket_id, 1);
109 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
111 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
114 nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
115 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
121 rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
124 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
127 nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
128 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
134 rte_port_ring_reader_free(void *port)
137 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
147 rte_port_ring_reader_stats_read(void *port,
148 struct rte_port_in_stats *stats, int clear)
150 struct rte_port_ring_reader *p =
151 (struct rte_port_ring_reader *) port;
154 memcpy(stats, &p->stats, sizeof(p->stats));
157 memset(&p->stats, 0, sizeof(p->stats));
165 #ifdef RTE_PORT_STATS_COLLECT
167 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
168 port->stats.n_pkts_in += val
169 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
170 port->stats.n_pkts_drop += val
174 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
175 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
179 struct rte_port_ring_writer {
180 struct rte_port_out_stats stats;
182 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
183 struct rte_ring *ring;
184 uint32_t tx_burst_sz;
185 uint32_t tx_buf_count;
191 rte_port_ring_writer_create_internal(void *params, int socket_id,
194 struct rte_port_ring_writer_params *conf =
195 (struct rte_port_ring_writer_params *) params;
196 struct rte_port_ring_writer *port;
198 /* Check input parameters */
199 if ((conf == NULL) ||
200 (conf->ring == NULL) ||
201 (conf->ring->prod.single && is_multi) ||
202 (!(conf->ring->prod.single) && !is_multi) ||
203 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
204 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
208 /* Memory allocation */
209 port = rte_zmalloc_socket("PORT", sizeof(*port),
210 RTE_CACHE_LINE_SIZE, socket_id);
212 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
217 port->ring = conf->ring;
218 port->tx_burst_sz = conf->tx_burst_sz;
219 port->tx_buf_count = 0;
220 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
221 port->is_multi = is_multi;
227 rte_port_ring_writer_create(void *params, int socket_id)
229 return rte_port_ring_writer_create_internal(params, socket_id, 0);
233 rte_port_ring_multi_writer_create(void *params, int socket_id)
235 return rte_port_ring_writer_create_internal(params, socket_id, 1);
239 send_burst(struct rte_port_ring_writer *p)
243 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
244 p->tx_buf_count, NULL);
246 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
247 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
248 rte_pktmbuf_free(p->tx_buf[nb_tx]);
254 send_burst_mp(struct rte_port_ring_writer *p)
258 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
259 p->tx_buf_count, NULL);
261 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
262 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
263 rte_pktmbuf_free(p->tx_buf[nb_tx]);
269 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
271 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
273 p->tx_buf[p->tx_buf_count++] = pkt;
274 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
275 if (p->tx_buf_count >= p->tx_burst_sz)
282 rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
284 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
286 p->tx_buf[p->tx_buf_count++] = pkt;
287 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
288 if (p->tx_buf_count >= p->tx_burst_sz)
294 static inline int __attribute__((always_inline))
295 rte_port_ring_writer_tx_bulk_internal(void *port,
296 struct rte_mbuf **pkts,
300 struct rte_port_ring_writer *p =
301 (struct rte_port_ring_writer *) port;
303 uint64_t bsz_mask = p->bsz_mask;
304 uint32_t tx_buf_count = p->tx_buf_count;
305 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
306 ((pkts_mask & bsz_mask) ^ bsz_mask);
309 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
319 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
321 n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
322 (void **)pkts, n_pkts, NULL);
324 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
325 (void **)pkts, n_pkts, NULL);
327 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
328 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
329 struct rte_mbuf *pkt = pkts[n_pkts_ok];
331 rte_pktmbuf_free(pkt);
334 for ( ; pkts_mask; ) {
335 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
336 uint64_t pkt_mask = 1LLU << pkt_index;
337 struct rte_mbuf *pkt = pkts[pkt_index];
339 p->tx_buf[tx_buf_count++] = pkt;
340 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
341 pkts_mask &= ~pkt_mask;
344 p->tx_buf_count = tx_buf_count;
345 if (tx_buf_count >= p->tx_burst_sz) {
357 rte_port_ring_writer_tx_bulk(void *port,
358 struct rte_mbuf **pkts,
361 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
365 rte_port_ring_multi_writer_tx_bulk(void *port,
366 struct rte_mbuf **pkts,
369 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
373 rte_port_ring_writer_flush(void *port)
375 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
377 if (p->tx_buf_count > 0)
384 rte_port_ring_multi_writer_flush(void *port)
386 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
388 if (p->tx_buf_count > 0)
395 rte_port_ring_writer_free(void *port)
397 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
400 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
405 rte_port_ring_multi_writer_flush(port);
407 rte_port_ring_writer_flush(port);
415 rte_port_ring_writer_stats_read(void *port,
416 struct rte_port_out_stats *stats, int clear)
418 struct rte_port_ring_writer *p =
419 (struct rte_port_ring_writer *) port;
422 memcpy(stats, &p->stats, sizeof(p->stats));
425 memset(&p->stats, 0, sizeof(p->stats));
431 * Port RING Writer Nodrop
433 #ifdef RTE_PORT_STATS_COLLECT
435 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
436 port->stats.n_pkts_in += val
437 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
438 port->stats.n_pkts_drop += val
442 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
443 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
447 struct rte_port_ring_writer_nodrop {
448 struct rte_port_out_stats stats;
450 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
451 struct rte_ring *ring;
452 uint32_t tx_burst_sz;
453 uint32_t tx_buf_count;
460 rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
463 struct rte_port_ring_writer_nodrop_params *conf =
464 (struct rte_port_ring_writer_nodrop_params *) params;
465 struct rte_port_ring_writer_nodrop *port;
467 /* Check input parameters */
468 if ((conf == NULL) ||
469 (conf->ring == NULL) ||
470 (conf->ring->prod.single && is_multi) ||
471 (!(conf->ring->prod.single) && !is_multi) ||
472 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
473 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
477 /* Memory allocation */
478 port = rte_zmalloc_socket("PORT", sizeof(*port),
479 RTE_CACHE_LINE_SIZE, socket_id);
481 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
486 port->ring = conf->ring;
487 port->tx_burst_sz = conf->tx_burst_sz;
488 port->tx_buf_count = 0;
489 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
490 port->is_multi = is_multi;
493 * When n_retries is 0 it means that we should wait for every packet to
494 * send no matter how many retries should it take. To limit number of
495 * branches in fast path, we use UINT64_MAX instead of branching.
497 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
503 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
505 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
509 rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
511 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
515 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
517 uint32_t nb_tx = 0, i;
519 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
520 p->tx_buf_count, NULL);
522 /* We sent all the packets in a first try */
523 if (nb_tx >= p->tx_buf_count) {
528 for (i = 0; i < p->n_retries; i++) {
529 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
530 (void **) (p->tx_buf + nb_tx),
531 p->tx_buf_count - nb_tx, NULL);
533 /* We sent all the packets in more than one try */
534 if (nb_tx >= p->tx_buf_count) {
540 /* We didn't send the packets in maximum allowed attempts */
541 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
542 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
543 rte_pktmbuf_free(p->tx_buf[nb_tx]);
549 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
551 uint32_t nb_tx = 0, i;
553 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
554 p->tx_buf_count, NULL);
556 /* We sent all the packets in a first try */
557 if (nb_tx >= p->tx_buf_count) {
562 for (i = 0; i < p->n_retries; i++) {
563 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
564 (void **) (p->tx_buf + nb_tx),
565 p->tx_buf_count - nb_tx, NULL);
567 /* We sent all the packets in more than one try */
568 if (nb_tx >= p->tx_buf_count) {
574 /* We didn't send the packets in maximum allowed attempts */
575 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
576 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
577 rte_pktmbuf_free(p->tx_buf[nb_tx]);
583 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
585 struct rte_port_ring_writer_nodrop *p =
586 (struct rte_port_ring_writer_nodrop *) port;
588 p->tx_buf[p->tx_buf_count++] = pkt;
589 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
590 if (p->tx_buf_count >= p->tx_burst_sz)
591 send_burst_nodrop(p);
597 rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
599 struct rte_port_ring_writer_nodrop *p =
600 (struct rte_port_ring_writer_nodrop *) port;
602 p->tx_buf[p->tx_buf_count++] = pkt;
603 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
604 if (p->tx_buf_count >= p->tx_burst_sz)
605 send_burst_mp_nodrop(p);
610 static inline int __attribute__((always_inline))
611 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
612 struct rte_mbuf **pkts,
616 struct rte_port_ring_writer_nodrop *p =
617 (struct rte_port_ring_writer_nodrop *) port;
619 uint64_t bsz_mask = p->bsz_mask;
620 uint32_t tx_buf_count = p->tx_buf_count;
621 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
622 ((pkts_mask & bsz_mask) ^ bsz_mask);
625 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
630 send_burst_mp_nodrop(p);
632 send_burst_nodrop(p);
635 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
638 rte_ring_mp_enqueue_burst(p->ring,
639 (void **)pkts, n_pkts, NULL);
642 rte_ring_sp_enqueue_burst(p->ring,
643 (void **)pkts, n_pkts, NULL);
645 if (n_pkts_ok >= n_pkts)
649 * If we didn't manage to send all packets in single burst, move
650 * remaining packets to the buffer and call send burst.
652 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
653 struct rte_mbuf *pkt = pkts[n_pkts_ok];
655 p->tx_buf[p->tx_buf_count++] = pkt;
658 send_burst_mp_nodrop(p);
660 send_burst_nodrop(p);
662 for ( ; pkts_mask; ) {
663 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
664 uint64_t pkt_mask = 1LLU << pkt_index;
665 struct rte_mbuf *pkt = pkts[pkt_index];
667 p->tx_buf[tx_buf_count++] = pkt;
668 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
669 pkts_mask &= ~pkt_mask;
672 p->tx_buf_count = tx_buf_count;
673 if (tx_buf_count >= p->tx_burst_sz) {
675 send_burst_mp_nodrop(p);
677 send_burst_nodrop(p);
685 rte_port_ring_writer_nodrop_tx_bulk(void *port,
686 struct rte_mbuf **pkts,
690 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
694 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
695 struct rte_mbuf **pkts,
699 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
703 rte_port_ring_writer_nodrop_flush(void *port)
705 struct rte_port_ring_writer_nodrop *p =
706 (struct rte_port_ring_writer_nodrop *) port;
708 if (p->tx_buf_count > 0)
709 send_burst_nodrop(p);
715 rte_port_ring_multi_writer_nodrop_flush(void *port)
717 struct rte_port_ring_writer_nodrop *p =
718 (struct rte_port_ring_writer_nodrop *) port;
720 if (p->tx_buf_count > 0)
721 send_burst_mp_nodrop(p);
727 rte_port_ring_writer_nodrop_free(void *port)
729 struct rte_port_ring_writer_nodrop *p =
730 (struct rte_port_ring_writer_nodrop *) port;
733 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
738 rte_port_ring_multi_writer_nodrop_flush(port);
740 rte_port_ring_writer_nodrop_flush(port);
748 rte_port_ring_writer_nodrop_stats_read(void *port,
749 struct rte_port_out_stats *stats, int clear)
751 struct rte_port_ring_writer_nodrop *p =
752 (struct rte_port_ring_writer_nodrop *) port;
755 memcpy(stats, &p->stats, sizeof(p->stats));
758 memset(&p->stats, 0, sizeof(p->stats));
764 * Summary of port operations
766 struct rte_port_in_ops rte_port_ring_reader_ops = {
767 .f_create = rte_port_ring_reader_create,
768 .f_free = rte_port_ring_reader_free,
769 .f_rx = rte_port_ring_reader_rx,
770 .f_stats = rte_port_ring_reader_stats_read,
773 struct rte_port_out_ops rte_port_ring_writer_ops = {
774 .f_create = rte_port_ring_writer_create,
775 .f_free = rte_port_ring_writer_free,
776 .f_tx = rte_port_ring_writer_tx,
777 .f_tx_bulk = rte_port_ring_writer_tx_bulk,
778 .f_flush = rte_port_ring_writer_flush,
779 .f_stats = rte_port_ring_writer_stats_read,
782 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
783 .f_create = rte_port_ring_writer_nodrop_create,
784 .f_free = rte_port_ring_writer_nodrop_free,
785 .f_tx = rte_port_ring_writer_nodrop_tx,
786 .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
787 .f_flush = rte_port_ring_writer_nodrop_flush,
788 .f_stats = rte_port_ring_writer_nodrop_stats_read,
791 struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
792 .f_create = rte_port_ring_multi_reader_create,
793 .f_free = rte_port_ring_reader_free,
794 .f_rx = rte_port_ring_multi_reader_rx,
795 .f_stats = rte_port_ring_reader_stats_read,
798 struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
799 .f_create = rte_port_ring_multi_writer_create,
800 .f_free = rte_port_ring_writer_free,
801 .f_tx = rte_port_ring_multi_writer_tx,
802 .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
803 .f_flush = rte_port_ring_multi_writer_flush,
804 .f_stats = rte_port_ring_writer_stats_read,
807 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
808 .f_create = rte_port_ring_multi_writer_nodrop_create,
809 .f_free = rte_port_ring_writer_nodrop_free,
810 .f_tx = rte_port_ring_multi_writer_nodrop_tx,
811 .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
812 .f_flush = rte_port_ring_multi_writer_nodrop_flush,
813 .f_stats = rte_port_ring_writer_nodrop_stats_read,