doc: move FAQ
[dpdk.git] / lib / librte_port / rte_port_ring.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 #include <string.h>
34 #include <stdint.h>
35
36 #include <rte_mbuf.h>
37 #include <rte_ring.h>
38 #include <rte_malloc.h>
39
40 #include "rte_port_ring.h"
41
42 /*
43  * Port RING Reader
44  */
45 #ifdef RTE_PORT_STATS_COLLECT
46
47 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48         port->stats.n_pkts_in += val
49 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50         port->stats.n_pkts_drop += val
51
52 #else
53
54 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
56
57 #endif
58
59 struct rte_port_ring_reader {
60         struct rte_port_in_stats stats;
61
62         struct rte_ring *ring;
63 };
64
65 static void *
66 rte_port_ring_reader_create(void *params, int socket_id)
67 {
68         struct rte_port_ring_reader_params *conf =
69                         (struct rte_port_ring_reader_params *) params;
70         struct rte_port_ring_reader *port;
71
72         /* Check input parameters */
73         if (conf == NULL) {
74                 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
75                 return NULL;
76         }
77
78         /* Memory allocation */
79         port = rte_zmalloc_socket("PORT", sizeof(*port),
80                         RTE_CACHE_LINE_SIZE, socket_id);
81         if (port == NULL) {
82                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
83                 return NULL;
84         }
85
86         /* Initialization */
87         port->ring = conf->ring;
88
89         return port;
90 }
91
92 static int
93 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
94 {
95         struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
96         uint32_t nb_rx;
97
98         nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
99         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
100
101         return nb_rx;
102 }
103
104 static int
105 rte_port_ring_reader_free(void *port)
106 {
107         if (port == NULL) {
108                 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
109                 return -EINVAL;
110         }
111
112         rte_free(port);
113
114         return 0;
115 }
116
117 static int
118 rte_port_ring_reader_stats_read(void *port,
119                 struct rte_port_in_stats *stats, int clear)
120 {
121         struct rte_port_ring_reader *p =
122                 (struct rte_port_ring_reader *) port;
123
124         if (stats != NULL)
125                 memcpy(stats, &p->stats, sizeof(p->stats));
126
127         if (clear)
128                 memset(&p->stats, 0, sizeof(p->stats));
129
130         return 0;
131 }
132
133 /*
134  * Port RING Writer
135  */
136 #ifdef RTE_PORT_STATS_COLLECT
137
138 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
139         port->stats.n_pkts_in += val
140 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
141         port->stats.n_pkts_drop += val
142
143 #else
144
145 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
146 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
147
148 #endif
149
150 struct rte_port_ring_writer {
151         struct rte_port_out_stats stats;
152
153         struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
154         struct rte_ring *ring;
155         uint32_t tx_burst_sz;
156         uint32_t tx_buf_count;
157         uint64_t bsz_mask;
158 };
159
160 static void *
161 rte_port_ring_writer_create(void *params, int socket_id)
162 {
163         struct rte_port_ring_writer_params *conf =
164                         (struct rte_port_ring_writer_params *) params;
165         struct rte_port_ring_writer *port;
166
167         /* Check input parameters */
168         if ((conf == NULL) ||
169             (conf->ring == NULL) ||
170                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
171                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
172                 return NULL;
173         }
174
175         /* Memory allocation */
176         port = rte_zmalloc_socket("PORT", sizeof(*port),
177                         RTE_CACHE_LINE_SIZE, socket_id);
178         if (port == NULL) {
179                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
180                 return NULL;
181         }
182
183         /* Initialization */
184         port->ring = conf->ring;
185         port->tx_burst_sz = conf->tx_burst_sz;
186         port->tx_buf_count = 0;
187         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
188
189         return port;
190 }
191
192 static inline void
193 send_burst(struct rte_port_ring_writer *p)
194 {
195         uint32_t nb_tx;
196
197         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
198                         p->tx_buf_count);
199
200         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
201         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
202                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
203
204         p->tx_buf_count = 0;
205 }
206
207 static int
208 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
209 {
210         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
211
212         p->tx_buf[p->tx_buf_count++] = pkt;
213         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
214         if (p->tx_buf_count >= p->tx_burst_sz)
215                 send_burst(p);
216
217         return 0;
218 }
219
220 static int
221 rte_port_ring_writer_tx_bulk(void *port,
222                 struct rte_mbuf **pkts,
223                 uint64_t pkts_mask)
224 {
225         struct rte_port_ring_writer *p =
226                 (struct rte_port_ring_writer *) port;
227
228         uint32_t bsz_mask = p->bsz_mask;
229         uint32_t tx_buf_count = p->tx_buf_count;
230         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
231                         ((pkts_mask & bsz_mask) ^ bsz_mask);
232
233         if (expr == 0) {
234                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
235                 uint32_t n_pkts_ok;
236
237                 if (tx_buf_count)
238                         send_burst(p);
239
240                 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
241                 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
242
243                 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
244                 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
245                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
246
247                         rte_pktmbuf_free(pkt);
248                 }
249         } else {
250                 for ( ; pkts_mask; ) {
251                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
252                         uint64_t pkt_mask = 1LLU << pkt_index;
253                         struct rte_mbuf *pkt = pkts[pkt_index];
254
255                         p->tx_buf[tx_buf_count++] = pkt;
256                         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
257                         pkts_mask &= ~pkt_mask;
258                 }
259
260                 p->tx_buf_count = tx_buf_count;
261                 if (tx_buf_count >= p->tx_burst_sz)
262                         send_burst(p);
263         }
264
265         return 0;
266 }
267
268 static int
269 rte_port_ring_writer_flush(void *port)
270 {
271         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
272
273         if (p->tx_buf_count > 0)
274                 send_burst(p);
275
276         return 0;
277 }
278
279 static int
280 rte_port_ring_writer_free(void *port)
281 {
282         if (port == NULL) {
283                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
284                 return -EINVAL;
285         }
286
287         rte_port_ring_writer_flush(port);
288         rte_free(port);
289
290         return 0;
291 }
292
293 static int
294 rte_port_ring_writer_stats_read(void *port,
295                 struct rte_port_out_stats *stats, int clear)
296 {
297         struct rte_port_ring_writer *p =
298                 (struct rte_port_ring_writer *) port;
299
300         if (stats != NULL)
301                 memcpy(stats, &p->stats, sizeof(p->stats));
302
303         if (clear)
304                 memset(&p->stats, 0, sizeof(p->stats));
305
306         return 0;
307 }
308
309 /*
310  * Port RING Writer Nodrop
311  */
312 #ifdef RTE_PORT_STATS_COLLECT
313
314 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
315         port->stats.n_pkts_in += val
316 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
317         port->stats.n_pkts_drop += val
318
319 #else
320
321 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
322 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
323
324 #endif
325
326 struct rte_port_ring_writer_nodrop {
327         struct rte_port_out_stats stats;
328
329         struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
330         struct rte_ring *ring;
331         uint32_t tx_burst_sz;
332         uint32_t tx_buf_count;
333         uint64_t bsz_mask;
334         uint64_t n_retries;
335 };
336
337 static void *
338 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
339 {
340         struct rte_port_ring_writer_nodrop_params *conf =
341                         (struct rte_port_ring_writer_nodrop_params *) params;
342         struct rte_port_ring_writer_nodrop *port;
343
344         /* Check input parameters */
345         if ((conf == NULL) ||
346             (conf->ring == NULL) ||
347                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
348                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
349                 return NULL;
350         }
351
352         /* Memory allocation */
353         port = rte_zmalloc_socket("PORT", sizeof(*port),
354                         RTE_CACHE_LINE_SIZE, socket_id);
355         if (port == NULL) {
356                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
357                 return NULL;
358         }
359
360         /* Initialization */
361         port->ring = conf->ring;
362         port->tx_burst_sz = conf->tx_burst_sz;
363         port->tx_buf_count = 0;
364         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
365
366         /*
367          * When n_retries is 0 it means that we should wait for every packet to
368          * send no matter how many retries should it take. To limit number of
369          * branches in fast path, we use UINT64_MAX instead of branching.
370          */
371         port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
372
373         return port;
374 }
375
376 static inline void
377 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
378 {
379         uint32_t nb_tx = 0, i;
380
381         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
382                                 p->tx_buf_count);
383
384         /* We sent all the packets in a first try */
385         if (nb_tx >= p->tx_buf_count)
386                 return;
387
388         for (i = 0; i < p->n_retries; i++) {
389                 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
390                                 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
391
392                 /* We sent all the packets in more than one try */
393                 if (nb_tx >= p->tx_buf_count)
394                         return;
395         }
396
397         /* We didn't send the packets in maximum allowed attempts */
398         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
399         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
400                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
401
402         p->tx_buf_count = 0;
403 }
404
405 static int
406 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
407 {
408         struct rte_port_ring_writer_nodrop *p =
409                         (struct rte_port_ring_writer_nodrop *) port;
410
411         p->tx_buf[p->tx_buf_count++] = pkt;
412         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
413         if (p->tx_buf_count >= p->tx_burst_sz)
414                 send_burst_nodrop(p);
415
416         return 0;
417 }
418
419 static int
420 rte_port_ring_writer_nodrop_tx_bulk(void *port,
421                 struct rte_mbuf **pkts,
422                 uint64_t pkts_mask)
423 {
424         struct rte_port_ring_writer_nodrop *p =
425                 (struct rte_port_ring_writer_nodrop *) port;
426
427         uint32_t bsz_mask = p->bsz_mask;
428         uint32_t tx_buf_count = p->tx_buf_count;
429         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
430                         ((pkts_mask & bsz_mask) ^ bsz_mask);
431
432         if (expr == 0) {
433                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
434                 uint32_t n_pkts_ok;
435
436                 if (tx_buf_count)
437                         send_burst_nodrop(p);
438
439                 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
440                 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
441
442                 if (n_pkts_ok >= n_pkts)
443                         return 0;
444
445                 /*
446                  * If we didnt manage to send all packets in single burst, move
447                  * remaining packets to the buffer and call send burst.
448                  */
449                 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
450                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
451                         p->tx_buf[p->tx_buf_count++] = pkt;
452                 }
453                 send_burst_nodrop(p);
454         } else {
455                 for ( ; pkts_mask; ) {
456                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
457                         uint64_t pkt_mask = 1LLU << pkt_index;
458                         struct rte_mbuf *pkt = pkts[pkt_index];
459
460                         p->tx_buf[tx_buf_count++] = pkt;
461                         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
462                         pkts_mask &= ~pkt_mask;
463                 }
464
465                 p->tx_buf_count = tx_buf_count;
466                 if (tx_buf_count >= p->tx_burst_sz)
467                         send_burst_nodrop(p);
468         }
469
470         return 0;
471 }
472
473 static int
474 rte_port_ring_writer_nodrop_flush(void *port)
475 {
476         struct rte_port_ring_writer_nodrop *p =
477                         (struct rte_port_ring_writer_nodrop *) port;
478
479         if (p->tx_buf_count > 0)
480                 send_burst_nodrop(p);
481
482         return 0;
483 }
484
485 static int
486 rte_port_ring_writer_nodrop_free(void *port)
487 {
488         if (port == NULL) {
489                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
490                 return -EINVAL;
491         }
492
493         rte_port_ring_writer_nodrop_flush(port);
494         rte_free(port);
495
496         return 0;
497 }
498
499 static int
500 rte_port_ring_writer_nodrop_stats_read(void *port,
501                 struct rte_port_out_stats *stats, int clear)
502 {
503         struct rte_port_ring_writer_nodrop *p =
504                 (struct rte_port_ring_writer_nodrop *) port;
505
506         if (stats != NULL)
507                 memcpy(stats, &p->stats, sizeof(p->stats));
508
509         if (clear)
510                 memset(&p->stats, 0, sizeof(p->stats));
511
512         return 0;
513 }
514
515 /*
516  * Summary of port operations
517  */
518 struct rte_port_in_ops rte_port_ring_reader_ops = {
519         .f_create = rte_port_ring_reader_create,
520         .f_free = rte_port_ring_reader_free,
521         .f_rx = rte_port_ring_reader_rx,
522         .f_stats = rte_port_ring_reader_stats_read,
523 };
524
525 struct rte_port_out_ops rte_port_ring_writer_ops = {
526         .f_create = rte_port_ring_writer_create,
527         .f_free = rte_port_ring_writer_free,
528         .f_tx = rte_port_ring_writer_tx,
529         .f_tx_bulk = rte_port_ring_writer_tx_bulk,
530         .f_flush = rte_port_ring_writer_flush,
531         .f_stats = rte_port_ring_writer_stats_read,
532 };
533
534 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
535         .f_create = rte_port_ring_writer_nodrop_create,
536         .f_free = rte_port_ring_writer_nodrop_free,
537         .f_tx = rte_port_ring_writer_nodrop_tx,
538         .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
539         .f_flush = rte_port_ring_writer_nodrop_flush,
540         .f_stats = rte_port_ring_writer_nodrop_stats_read,
541 };