port: add ring writer nodrop
[dpdk.git] / lib / librte_port / rte_port_ring.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 #include <string.h>
34 #include <stdint.h>
35
36 #include <rte_mbuf.h>
37 #include <rte_ring.h>
38 #include <rte_malloc.h>
39
40 #include "rte_port_ring.h"
41
42 /*
43  * Port RING Reader
44  */
45 struct rte_port_ring_reader {
46         struct rte_ring *ring;
47 };
48
49 static void *
50 rte_port_ring_reader_create(void *params, int socket_id)
51 {
52         struct rte_port_ring_reader_params *conf =
53                         (struct rte_port_ring_reader_params *) params;
54         struct rte_port_ring_reader *port;
55
56         /* Check input parameters */
57         if (conf == NULL) {
58                 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
59                 return NULL;
60         }
61
62         /* Memory allocation */
63         port = rte_zmalloc_socket("PORT", sizeof(*port),
64                         RTE_CACHE_LINE_SIZE, socket_id);
65         if (port == NULL) {
66                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
67                 return NULL;
68         }
69
70         /* Initialization */
71         port->ring = conf->ring;
72
73         return port;
74 }
75
76 static int
77 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
78 {
79         struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
80
81         return rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
82 }
83
84 static int
85 rte_port_ring_reader_free(void *port)
86 {
87         if (port == NULL) {
88                 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
89                 return -EINVAL;
90         }
91
92         rte_free(port);
93
94         return 0;
95 }
96
97 /*
98  * Port RING Writer
99  */
100 struct rte_port_ring_writer {
101         struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
102         struct rte_ring *ring;
103         uint32_t tx_burst_sz;
104         uint32_t tx_buf_count;
105         uint64_t bsz_mask;
106 };
107
108 static void *
109 rte_port_ring_writer_create(void *params, int socket_id)
110 {
111         struct rte_port_ring_writer_params *conf =
112                         (struct rte_port_ring_writer_params *) params;
113         struct rte_port_ring_writer *port;
114
115         /* Check input parameters */
116         if ((conf == NULL) ||
117             (conf->ring == NULL) ||
118                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
119                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
120                 return NULL;
121         }
122
123         /* Memory allocation */
124         port = rte_zmalloc_socket("PORT", sizeof(*port),
125                         RTE_CACHE_LINE_SIZE, socket_id);
126         if (port == NULL) {
127                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
128                 return NULL;
129         }
130
131         /* Initialization */
132         port->ring = conf->ring;
133         port->tx_burst_sz = conf->tx_burst_sz;
134         port->tx_buf_count = 0;
135         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
136
137         return port;
138 }
139
140 static inline void
141 send_burst(struct rte_port_ring_writer *p)
142 {
143         uint32_t nb_tx;
144
145         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
146                         p->tx_buf_count);
147
148         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
149                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
150
151         p->tx_buf_count = 0;
152 }
153
154 static int
155 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
156 {
157         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
158
159         p->tx_buf[p->tx_buf_count++] = pkt;
160         if (p->tx_buf_count >= p->tx_burst_sz)
161                 send_burst(p);
162
163         return 0;
164 }
165
166 static int
167 rte_port_ring_writer_tx_bulk(void *port,
168                 struct rte_mbuf **pkts,
169                 uint64_t pkts_mask)
170 {
171         struct rte_port_ring_writer *p =
172                 (struct rte_port_ring_writer *) port;
173
174         uint32_t bsz_mask = p->bsz_mask;
175         uint32_t tx_buf_count = p->tx_buf_count;
176         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
177                         ((pkts_mask & bsz_mask) ^ bsz_mask);
178
179         if (expr == 0) {
180                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
181                 uint32_t n_pkts_ok;
182
183                 if (tx_buf_count)
184                         send_burst(p);
185
186                 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
187
188                 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
189                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
190
191                         rte_pktmbuf_free(pkt);
192                 }
193         } else {
194                 for ( ; pkts_mask; ) {
195                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
196                         uint64_t pkt_mask = 1LLU << pkt_index;
197                         struct rte_mbuf *pkt = pkts[pkt_index];
198
199                         p->tx_buf[tx_buf_count++] = pkt;
200                         pkts_mask &= ~pkt_mask;
201                 }
202
203                 p->tx_buf_count = tx_buf_count;
204                 if (tx_buf_count >= p->tx_burst_sz)
205                         send_burst(p);
206         }
207
208         return 0;
209 }
210
211 static int
212 rte_port_ring_writer_flush(void *port)
213 {
214         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
215
216         if (p->tx_buf_count > 0)
217                 send_burst(p);
218
219         return 0;
220 }
221
222 static int
223 rte_port_ring_writer_free(void *port)
224 {
225         if (port == NULL) {
226                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
227                 return -EINVAL;
228         }
229
230         rte_port_ring_writer_flush(port);
231         rte_free(port);
232
233         return 0;
234 }
235
236 /*
237  * Port RING Writer Nodrop
238  */
239 struct rte_port_ring_writer_nodrop {
240         struct rte_mbuf *tx_buf[RTE_PORT_IN_BURST_SIZE_MAX];
241         struct rte_ring *ring;
242         uint32_t tx_burst_sz;
243         uint32_t tx_buf_count;
244         uint64_t bsz_mask;
245         uint64_t n_retries;
246 };
247
248 static void *
249 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
250 {
251         struct rte_port_ring_writer_nodrop_params *conf =
252                         (struct rte_port_ring_writer_nodrop_params *) params;
253         struct rte_port_ring_writer_nodrop *port;
254
255         /* Check input parameters */
256         if ((conf == NULL) ||
257             (conf->ring == NULL) ||
258                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
259                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
260                 return NULL;
261         }
262
263         /* Memory allocation */
264         port = rte_zmalloc_socket("PORT", sizeof(*port),
265                         RTE_CACHE_LINE_SIZE, socket_id);
266         if (port == NULL) {
267                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
268                 return NULL;
269         }
270
271         /* Initialization */
272         port->ring = conf->ring;
273         port->tx_burst_sz = conf->tx_burst_sz;
274         port->tx_buf_count = 0;
275         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
276
277         /*
278          * When n_retries is 0 it means that we should wait for every packet to
279          * send no matter how many retries should it take. To limit number of
280          * branches in fast path, we use UINT64_MAX instead of branching.
281          */
282         port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
283
284         return port;
285 }
286
287 static inline void
288 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
289 {
290         uint32_t nb_tx = 0, i;
291
292         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
293                                 p->tx_buf_count);
294
295         /* We sent all the packets in a first try */
296         if (nb_tx >= p->tx_buf_count)
297                 return;
298
299         for (i = 0; i < p->n_retries; i++) {
300                 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
301                                 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
302
303                 /* We sent all the packets in more than one try */
304                 if (nb_tx >= p->tx_buf_count)
305                         return;
306         }
307
308         /* We didn't send the packets in maximum allowed attempts */
309         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
310                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
311
312         p->tx_buf_count = 0;
313 }
314
315 static int
316 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
317 {
318         struct rte_port_ring_writer_nodrop *p =
319                         (struct rte_port_ring_writer_nodrop *) port;
320
321         p->tx_buf[p->tx_buf_count++] = pkt;
322         if (p->tx_buf_count >= p->tx_burst_sz)
323                 send_burst_nodrop(p);
324
325         return 0;
326 }
327
328 static int
329 rte_port_ring_writer_nodrop_tx_bulk(void *port,
330                 struct rte_mbuf **pkts,
331                 uint64_t pkts_mask)
332 {
333         struct rte_port_ring_writer_nodrop *p =
334                 (struct rte_port_ring_writer_nodrop *) port;
335
336         uint32_t bsz_mask = p->bsz_mask;
337         uint32_t tx_buf_count = p->tx_buf_count;
338         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
339                         ((pkts_mask & bsz_mask) ^ bsz_mask);
340
341         if (expr == 0) {
342                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
343                 uint32_t n_pkts_ok;
344
345                 if (tx_buf_count)
346                         send_burst_nodrop(p);
347
348                 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
349
350                 if (n_pkts_ok >= n_pkts)
351                         return 0;
352
353                 /*
354                  * If we didnt manage to send all packets in single burst, move
355                  * remaining packets to the buffer and call send burst.
356                  */
357                 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
358                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
359                         p->tx_buf[p->tx_buf_count++] = pkt;
360                 }
361                 send_burst_nodrop(p);
362         } else {
363                 for ( ; pkts_mask; ) {
364                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
365                         uint64_t pkt_mask = 1LLU << pkt_index;
366                         struct rte_mbuf *pkt = pkts[pkt_index];
367
368                         p->tx_buf[tx_buf_count++] = pkt;
369                         pkts_mask &= ~pkt_mask;
370                 }
371
372                 p->tx_buf_count = tx_buf_count;
373                 if (tx_buf_count >= p->tx_burst_sz)
374                         send_burst_nodrop(p);
375         }
376
377         return 0;
378 }
379
380 static int
381 rte_port_ring_writer_nodrop_flush(void *port)
382 {
383         struct rte_port_ring_writer_nodrop *p =
384                         (struct rte_port_ring_writer_nodrop *) port;
385
386         if (p->tx_buf_count > 0)
387                 send_burst_nodrop(p);
388
389         return 0;
390 }
391
392 static int
393 rte_port_ring_writer_nodrop_free(void *port)
394 {
395         if (port == NULL) {
396                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
397                 return -EINVAL;
398         }
399
400         rte_port_ring_writer_nodrop_flush(port);
401         rte_free(port);
402
403         return 0;
404 }
405
406 /*
407  * Summary of port operations
408  */
409 struct rte_port_in_ops rte_port_ring_reader_ops = {
410         .f_create = rte_port_ring_reader_create,
411         .f_free = rte_port_ring_reader_free,
412         .f_rx = rte_port_ring_reader_rx,
413 };
414
415 struct rte_port_out_ops rte_port_ring_writer_ops = {
416         .f_create = rte_port_ring_writer_create,
417         .f_free = rte_port_ring_writer_free,
418         .f_tx = rte_port_ring_writer_tx,
419         .f_tx_bulk = rte_port_ring_writer_tx_bulk,
420         .f_flush = rte_port_ring_writer_flush,
421 };
422
423 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
424         .f_create = rte_port_ring_writer_nodrop_create,
425         .f_free = rte_port_ring_writer_nodrop_free,
426         .f_tx = rte_port_ring_writer_nodrop_tx,
427         .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
428         .f_flush = rte_port_ring_writer_nodrop_flush,
429 };