4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 #include <rte_malloc.h>
40 #include "rte_port_ring.h"
45 struct rte_port_ring_reader {
46 struct rte_ring *ring;
50 rte_port_ring_reader_create(void *params, int socket_id)
52 struct rte_port_ring_reader_params *conf =
53 (struct rte_port_ring_reader_params *) params;
54 struct rte_port_ring_reader *port;
56 /* Check input parameters */
58 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
62 /* Memory allocation */
63 port = rte_zmalloc_socket("PORT", sizeof(*port),
64 RTE_CACHE_LINE_SIZE, socket_id);
66 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
71 port->ring = conf->ring;
77 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
79 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
81 return rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
85 rte_port_ring_reader_free(void *port)
88 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
100 struct rte_port_ring_writer {
101 struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
102 struct rte_ring *ring;
103 uint32_t tx_burst_sz;
104 uint32_t tx_buf_count;
109 rte_port_ring_writer_create(void *params, int socket_id)
111 struct rte_port_ring_writer_params *conf =
112 (struct rte_port_ring_writer_params *) params;
113 struct rte_port_ring_writer *port;
115 /* Check input parameters */
116 if ((conf == NULL) ||
117 (conf->ring == NULL) ||
118 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
119 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
123 /* Memory allocation */
124 port = rte_zmalloc_socket("PORT", sizeof(*port),
125 RTE_CACHE_LINE_SIZE, socket_id);
127 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
132 port->ring = conf->ring;
133 port->tx_burst_sz = conf->tx_burst_sz;
134 port->tx_buf_count = 0;
135 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
141 send_burst(struct rte_port_ring_writer *p)
145 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
148 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
149 rte_pktmbuf_free(p->tx_buf[nb_tx]);
155 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
157 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
159 p->tx_buf[p->tx_buf_count++] = pkt;
160 if (p->tx_buf_count >= p->tx_burst_sz)
167 rte_port_ring_writer_tx_bulk(void *port,
168 struct rte_mbuf **pkts,
171 struct rte_port_ring_writer *p =
172 (struct rte_port_ring_writer *) port;
174 uint32_t bsz_mask = p->bsz_mask;
175 uint32_t tx_buf_count = p->tx_buf_count;
176 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
177 ((pkts_mask & bsz_mask) ^ bsz_mask);
180 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
186 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
188 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
189 struct rte_mbuf *pkt = pkts[n_pkts_ok];
191 rte_pktmbuf_free(pkt);
194 for ( ; pkts_mask; ) {
195 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
196 uint64_t pkt_mask = 1LLU << pkt_index;
197 struct rte_mbuf *pkt = pkts[pkt_index];
199 p->tx_buf[tx_buf_count++] = pkt;
200 pkts_mask &= ~pkt_mask;
203 p->tx_buf_count = tx_buf_count;
204 if (tx_buf_count >= p->tx_burst_sz)
212 rte_port_ring_writer_flush(void *port)
214 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
216 if (p->tx_buf_count > 0)
223 rte_port_ring_writer_free(void *port)
226 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
230 rte_port_ring_writer_flush(port);
237 * Port RING Writer Nodrop
239 struct rte_port_ring_writer_nodrop {
240 struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
241 struct rte_ring *ring;
242 uint32_t tx_burst_sz;
243 uint32_t tx_buf_count;
249 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
251 struct rte_port_ring_writer_nodrop_params *conf =
252 (struct rte_port_ring_writer_nodrop_params *) params;
253 struct rte_port_ring_writer_nodrop *port;
255 /* Check input parameters */
256 if ((conf == NULL) ||
257 (conf->ring == NULL) ||
258 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
259 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
263 /* Memory allocation */
264 port = rte_zmalloc_socket("PORT", sizeof(*port),
265 RTE_CACHE_LINE_SIZE, socket_id);
267 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
272 port->ring = conf->ring;
273 port->tx_burst_sz = conf->tx_burst_sz;
274 port->tx_buf_count = 0;
275 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
278 * When n_retries is 0 it means that we should wait for every packet to
279 * send no matter how many retries should it take. To limit number of
280 * branches in fast path, we use UINT64_MAX instead of branching.
282 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
288 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
290 uint32_t nb_tx = 0, i;
292 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
295 /* We sent all the packets in a first try */
296 if (nb_tx >= p->tx_buf_count)
299 for (i = 0; i < p->n_retries; i++) {
300 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
301 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
303 /* We sent all the packets in more than one try */
304 if (nb_tx >= p->tx_buf_count)
308 /* We didn't send the packets in maximum allowed attempts */
309 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
310 rte_pktmbuf_free(p->tx_buf[nb_tx]);
316 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
318 struct rte_port_ring_writer_nodrop *p =
319 (struct rte_port_ring_writer_nodrop *) port;
321 p->tx_buf[p->tx_buf_count++] = pkt;
322 if (p->tx_buf_count >= p->tx_burst_sz)
323 send_burst_nodrop(p);
329 rte_port_ring_writer_nodrop_tx_bulk(void *port,
330 struct rte_mbuf **pkts,
333 struct rte_port_ring_writer_nodrop *p =
334 (struct rte_port_ring_writer_nodrop *) port;
336 uint32_t bsz_mask = p->bsz_mask;
337 uint32_t tx_buf_count = p->tx_buf_count;
338 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
339 ((pkts_mask & bsz_mask) ^ bsz_mask);
342 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
346 send_burst_nodrop(p);
348 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
350 if (n_pkts_ok >= n_pkts)
354 * If we didnt manage to send all packets in single burst, move
355 * remaining packets to the buffer and call send burst.
357 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
358 struct rte_mbuf *pkt = pkts[n_pkts_ok];
359 p->tx_buf[p->tx_buf_count++] = pkt;
361 send_burst_nodrop(p);
363 for ( ; pkts_mask; ) {
364 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
365 uint64_t pkt_mask = 1LLU << pkt_index;
366 struct rte_mbuf *pkt = pkts[pkt_index];
368 p->tx_buf[tx_buf_count++] = pkt;
369 pkts_mask &= ~pkt_mask;
372 p->tx_buf_count = tx_buf_count;
373 if (tx_buf_count >= p->tx_burst_sz)
374 send_burst_nodrop(p);
381 rte_port_ring_writer_nodrop_flush(void *port)
383 struct rte_port_ring_writer_nodrop *p =
384 (struct rte_port_ring_writer_nodrop *) port;
386 if (p->tx_buf_count > 0)
387 send_burst_nodrop(p);
393 rte_port_ring_writer_nodrop_free(void *port)
396 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
400 rte_port_ring_writer_nodrop_flush(port);
407 * Summary of port operations
409 struct rte_port_in_ops rte_port_ring_reader_ops = {
410 .f_create = rte_port_ring_reader_create,
411 .f_free = rte_port_ring_reader_free,
412 .f_rx = rte_port_ring_reader_rx,
415 struct rte_port_out_ops rte_port_ring_writer_ops = {
416 .f_create = rte_port_ring_writer_create,
417 .f_free = rte_port_ring_writer_free,
418 .f_tx = rte_port_ring_writer_tx,
419 .f_tx_bulk = rte_port_ring_writer_tx_bulk,
420 .f_flush = rte_port_ring_writer_flush,
423 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
424 .f_create = rte_port_ring_writer_nodrop_create,
425 .f_free = rte_port_ring_writer_nodrop_free,
426 .f_tx = rte_port_ring_writer_nodrop_tx,
427 .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
428 .f_flush = rte_port_ring_writer_nodrop_flush,