first public release
[dpdk.git] / examples / load_balancer / runtime.c
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2012 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without 
8  *   modification, are permitted provided that the following conditions 
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright 
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright 
14  *       notice, this list of conditions and the following disclaimer in 
15  *       the documentation and/or other materials provided with the 
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its 
18  *       contributors may be used to endorse or promote products derived 
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  * 
33  *  version: DPDK.L.1.2.3-3
34  */
35
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <stdint.h>
39 #include <inttypes.h>
40 #include <sys/types.h>
41 #include <string.h>
42 #include <sys/queue.h>
43 #include <stdarg.h>
44 #include <errno.h>
45 #include <getopt.h>
46
47 #include <rte_common.h>
48 #include <rte_byteorder.h>
49 #include <rte_log.h>
50 #include <rte_memory.h>
51 #include <rte_memcpy.h>
52 #include <rte_memzone.h>
53 #include <rte_tailq.h>
54 #include <rte_eal.h>
55 #include <rte_per_lcore.h>
56 #include <rte_launch.h>
57 #include <rte_atomic.h>
58 #include <rte_cycles.h>
59 #include <rte_prefetch.h>
60 #include <rte_lcore.h>
61 #include <rte_per_lcore.h>
62 #include <rte_branch_prediction.h>
63 #include <rte_interrupts.h>
64 #include <rte_pci.h>
65 #include <rte_random.h>
66 #include <rte_debug.h>
67 #include <rte_ether.h>
68 #include <rte_ethdev.h>
69 #include <rte_ring.h>
70 #include <rte_mempool.h>
71 #include <rte_mbuf.h>
72 #include <rte_ip.h>
73 #include <rte_tcp.h>
74 #include <rte_lpm.h>
75
76 #include "main.h"
77
78 #ifndef APP_LCORE_IO_FLUSH
79 #define APP_LCORE_IO_FLUSH           1000000
80 #endif
81
82 #ifndef APP_LCORE_WORKER_FLUSH
83 #define APP_LCORE_WORKER_FLUSH       1000000
84 #endif
85
86 #ifndef APP_STATS
87 #define APP_STATS                    1000000
88 #endif
89
90 #define APP_IO_RX_DROP_ALL_PACKETS   0
91 #define APP_WORKER_DROP_ALL_PACKETS  0
92 #define APP_IO_TX_DROP_ALL_PACKETS   0
93
94 #ifndef APP_IO_RX_PREFETCH_ENABLE
95 #define APP_IO_RX_PREFETCH_ENABLE    1
96 #endif
97
98 #ifndef APP_WORKER_PREFETCH_ENABLE
99 #define APP_WORKER_PREFETCH_ENABLE   1
100 #endif
101
102 #ifndef APP_IO_TX_PREFETCH_ENABLE
103 #define APP_IO_TX_PREFETCH_ENABLE    1
104 #endif
105
106 #if APP_IO_RX_PREFETCH_ENABLE
107 #define APP_IO_RX_PREFETCH0(p)       rte_prefetch0(p)
108 #define APP_IO_RX_PREFETCH1(p)       rte_prefetch1(p)
109 #else
110 #define APP_IO_RX_PREFETCH0(p)
111 #define APP_IO_RX_PREFETCH1(p)
112 #endif
113
114 #if APP_WORKER_PREFETCH_ENABLE
115 #define APP_WORKER_PREFETCH0(p)      rte_prefetch0(p)
116 #define APP_WORKER_PREFETCH1(p)      rte_prefetch1(p)
117 #else
118 #define APP_WORKER_PREFETCH0(p)
119 #define APP_WORKER_PREFETCH1(p)
120 #endif
121
122 #if APP_IO_TX_PREFETCH_ENABLE
123 #define APP_IO_TX_PREFETCH0(p)       rte_prefetch0(p)
124 #define APP_IO_TX_PREFETCH1(p)       rte_prefetch1(p)
125 #else
126 #define APP_IO_TX_PREFETCH0(p)
127 #define APP_IO_TX_PREFETCH1(p)
128 #endif
129
130 static inline void
131 app_lcore_io_rx_buffer_to_send (
132         struct app_lcore_params_io *lp,
133         uint32_t worker,
134         struct rte_mbuf *mbuf,
135         uint32_t bsz)
136 {
137         uint32_t pos;
138         int ret;
139
140         pos = lp->rx.mbuf_out[worker].n_mbufs;
141         lp->rx.mbuf_out[worker].array[pos ++] = mbuf;
142         if (likely(pos < bsz)) {
143                 lp->rx.mbuf_out[worker].n_mbufs = pos;
144                 return;
145         }
146
147         ret = rte_ring_sp_enqueue_bulk(
148                 lp->rx.rings[worker],
149                 (void **) lp->rx.mbuf_out[worker].array,
150                 bsz);
151
152         if (unlikely(ret == -ENOBUFS)) {
153                 uint32_t k;
154                 for (k = 0; k < bsz; k ++) {
155                         struct rte_mbuf *m = lp->rx.mbuf_out[worker].array[k];
156                         rte_pktmbuf_free(m);
157                 }
158         }
159
160         lp->rx.mbuf_out[worker].n_mbufs = 0;
161         lp->rx.mbuf_out_flush[worker] = 0;
162
163 #if APP_STATS
164         lp->rx.rings_iters[worker] ++;
165         if (likely(ret == 0)) {
166                 lp->rx.rings_count[worker] ++;
167         }
168         if (unlikely(lp->rx.rings_iters[worker] == APP_STATS)) {
169                 uint32_t lcore = rte_lcore_id();
170
171                 printf("\tI/O RX %u out (worker %u): enq success rate = %.2f\n",
172                         lcore,
173                         worker,
174                         ((double) lp->rx.rings_count[worker]) / ((double) lp->rx.rings_iters[worker]));
175                 lp->rx.rings_iters[worker] = 0;
176                 lp->rx.rings_count[worker] = 0;
177         }
178 #endif
179 }
180
181 static inline void
182 app_lcore_io_rx(
183         struct app_lcore_params_io *lp,
184         uint32_t n_workers,
185         uint32_t bsz_rd,
186         uint32_t bsz_wr,
187         uint8_t pos_lb)
188 {
189         struct rte_mbuf *mbuf_1_0, *mbuf_1_1, *mbuf_2_0, *mbuf_2_1;
190         uint8_t *data_1_0, *data_1_1;
191         uint32_t i;
192
193         for (i = 0; i < lp->rx.n_nic_queues; i ++) {
194                 uint8_t port = lp->rx.nic_queues[i].port;
195                 uint8_t queue = lp->rx.nic_queues[i].queue;
196                 uint32_t n_mbufs, j;
197
198                 n_mbufs = rte_eth_rx_burst(
199                         port,
200                         queue,
201                         lp->rx.mbuf_in.array,
202                         (uint16_t) bsz_rd);
203
204                 if (unlikely(n_mbufs == 0)) {
205                         continue;
206                 }
207
208 #if APP_STATS
209                 lp->rx.nic_queues_iters[i] ++;
210                 lp->rx.nic_queues_count[i] += n_mbufs;
211                 if (unlikely(lp->rx.nic_queues_iters[i] == APP_STATS)) {
212                         struct rte_eth_stats stats;
213                         uint32_t lcore = rte_lcore_id();
214
215                         rte_eth_stats_get(port, &stats);
216
217                         printf("I/O RX %u in (NIC port %u): NIC drop ratio = %.2f avg burst size = %.2f\n",
218                                 lcore,
219                                 (uint32_t) port,
220                                 (double) stats.ierrors / (double) (stats.ierrors + stats.ipackets),
221                                 ((double) lp->rx.nic_queues_count[i]) / ((double) lp->rx.nic_queues_iters[i]));
222                         lp->rx.nic_queues_iters[i] = 0;
223                         lp->rx.nic_queues_count[i] = 0;
224                 }
225 #endif
226
227 #if APP_IO_RX_DROP_ALL_PACKETS
228                 for (j = 0; j < n_mbufs; j ++) {
229                         struct rte_mbuf *pkt = lp->rx.mbuf_in.array[j];
230                         rte_pktmbuf_free(pkt);
231                 }
232
233                 continue;
234 #endif
235
236                 mbuf_1_0 = lp->rx.mbuf_in.array[0];
237                 mbuf_1_1 = lp->rx.mbuf_in.array[1];
238                 data_1_0 = rte_pktmbuf_mtod(mbuf_1_0, uint8_t *);
239                 if (likely(n_mbufs > 1)) {
240                         data_1_1 = rte_pktmbuf_mtod(mbuf_1_1, uint8_t *);
241                 }
242
243                 mbuf_2_0 = lp->rx.mbuf_in.array[2];
244                 mbuf_2_1 = lp->rx.mbuf_in.array[3];
245                 APP_IO_RX_PREFETCH0(mbuf_2_0);
246                 APP_IO_RX_PREFETCH0(mbuf_2_1);
247
248                 for (j = 0; j + 3 < n_mbufs; j += 2) {
249                         struct rte_mbuf *mbuf_0_0, *mbuf_0_1;
250                         uint8_t *data_0_0, *data_0_1;
251                         uint32_t worker_0, worker_1;
252
253                         mbuf_0_0 = mbuf_1_0;
254                         mbuf_0_1 = mbuf_1_1;
255                         data_0_0 = data_1_0;
256                         data_0_1 = data_1_1;
257
258                         mbuf_1_0 = mbuf_2_0;
259                         mbuf_1_1 = mbuf_2_1;
260                         data_1_0 = rte_pktmbuf_mtod(mbuf_2_0, uint8_t *);
261                         data_1_1 = rte_pktmbuf_mtod(mbuf_2_1, uint8_t *);
262                         APP_IO_RX_PREFETCH0(data_1_0);
263                         APP_IO_RX_PREFETCH0(data_1_1);
264
265                         mbuf_2_0 = lp->rx.mbuf_in.array[j+4];
266                         mbuf_2_1 = lp->rx.mbuf_in.array[j+5];
267                         APP_IO_RX_PREFETCH0(mbuf_2_0);
268                         APP_IO_RX_PREFETCH0(mbuf_2_1);
269
270                         worker_0 = data_0_0[pos_lb] & (n_workers - 1);
271                         worker_1 = data_0_1[pos_lb] & (n_workers - 1);
272
273                         app_lcore_io_rx_buffer_to_send(lp, worker_0, mbuf_0_0, bsz_wr);
274                         app_lcore_io_rx_buffer_to_send(lp, worker_1, mbuf_0_1, bsz_wr);
275                 }
276
277                 /* Handle the last 1, 2 (when n_mbufs is even) or 3 (when n_mbufs is odd) packets  */
278                 for ( ; j < n_mbufs; j += 1) {
279                         struct rte_mbuf *mbuf;
280                         uint8_t *data;
281                         uint32_t worker;
282
283                         mbuf = mbuf_1_0;
284                         mbuf_1_0 = mbuf_1_1;
285                         mbuf_1_1 = mbuf_2_0;
286                         mbuf_2_0 = mbuf_2_1;
287
288                         data = rte_pktmbuf_mtod(mbuf, uint8_t *);
289
290                         APP_IO_RX_PREFETCH0(mbuf_1_0);
291
292                         worker = data[pos_lb] & (n_workers - 1);
293
294                         app_lcore_io_rx_buffer_to_send(lp, worker, mbuf, bsz_wr);
295                 }
296         }
297 }
298
299 static inline void
300 app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
301 {
302         uint32_t worker;
303
304         for (worker = 0; worker < n_workers; worker ++) {
305                 int ret;
306
307                 if (likely((lp->rx.mbuf_out_flush[worker] == 0) ||
308                            (lp->rx.mbuf_out[worker].n_mbufs == 0))) {
309                         lp->rx.mbuf_out_flush[worker] = 1;
310                         continue;
311                 }
312
313                 ret = rte_ring_sp_enqueue_bulk(
314                         lp->rx.rings[worker],
315                         (void **) lp->rx.mbuf_out[worker].array,
316                         lp->rx.mbuf_out[worker].n_mbufs);
317
318                 if (unlikely(ret < 0)) {
319                         uint32_t k;
320                         for (k = 0; k < lp->rx.mbuf_out[worker].n_mbufs; k ++) {
321                                 struct rte_mbuf *pkt_to_free = lp->rx.mbuf_out[worker].array[k];
322                                 rte_pktmbuf_free(pkt_to_free);
323                         }
324                 }
325
326                 lp->rx.mbuf_out[worker].n_mbufs = 0;
327                 lp->rx.mbuf_out_flush[worker] = 1;
328         }
329 }
330
331 static inline void
332 app_lcore_io_tx(
333         struct app_lcore_params_io *lp,
334         uint32_t n_workers,
335         uint32_t bsz_rd,
336         uint32_t bsz_wr)
337 {
338         uint32_t worker;
339
340         for (worker = 0; worker < n_workers; worker ++) {
341                 uint32_t i;
342
343                 for (i = 0; i < lp->tx.n_nic_ports; i ++) {
344                         uint8_t port = lp->tx.nic_ports[i];
345                         struct rte_ring *ring = lp->tx.rings[port][worker];
346                         uint32_t n_mbufs, n_pkts;
347                         int ret;
348
349                         n_mbufs = lp->tx.mbuf_out[port].n_mbufs;
350                         ret = rte_ring_sc_dequeue_bulk(
351                                 ring,
352                                 (void **) &lp->tx.mbuf_out[port].array[n_mbufs],
353                                 bsz_rd);
354
355                         if (unlikely(ret == -ENOENT)) {
356                                 continue;
357                         }
358
359                         n_mbufs += bsz_rd;
360
361 #if APP_IO_TX_DROP_ALL_PACKETS
362                         {
363                                 uint32_t j;
364                                 APP_IO_TX_PREFETCH0(lp->tx.mbuf_out[port].array[0]);
365                                 APP_IO_TX_PREFETCH0(lp->tx.mbuf_out[port].array[1]);
366
367                                 for (j = 0; j < n_mbufs; j ++) {
368                                         if (likely(j < n_mbufs - 2)) {
369                                                 APP_IO_TX_PREFETCH0(lp->tx.mbuf_out[port].array[j + 2]);
370                                         }
371
372                                         rte_pktmbuf_free(lp->tx.mbuf_out[port].array[j]);
373                                 }
374
375                                 lp->tx.mbuf_out[port].n_mbufs = 0;
376
377                                 continue;
378                         }
379 #endif
380
381                         if (unlikely(n_mbufs < bsz_wr)) {
382                                 lp->tx.mbuf_out[port].n_mbufs = n_mbufs;
383                                 continue;
384                         }
385
386                         n_pkts = rte_eth_tx_burst(
387                                 port,
388                                 0,
389                                 lp->tx.mbuf_out[port].array,
390                                 (uint16_t) n_mbufs);
391
392 #if APP_STATS
393                         lp->tx.nic_ports_iters[port] ++;
394                         lp->tx.nic_ports_count[port] += n_pkts;
395                         if (unlikely(lp->tx.nic_ports_iters[port] == APP_STATS)) {
396                                 uint32_t lcore = rte_lcore_id();
397
398                                 printf("\t\t\tI/O TX %u out (port %u): avg burst size = %.2f\n",
399                                         lcore,
400                                         (uint32_t) port,
401                                         ((double) lp->tx.nic_ports_count[port]) / ((double) lp->tx.nic_ports_iters[port]));
402                                 lp->tx.nic_ports_iters[port] = 0;
403                                 lp->tx.nic_ports_count[port] = 0;
404                         }
405 #endif
406
407                         if (unlikely(n_pkts < n_mbufs)) {
408                                 uint32_t k;
409                                 for (k = n_pkts; k < n_mbufs; k ++) {
410                                         struct rte_mbuf *pkt_to_free = lp->tx.mbuf_out[port].array[k];
411                                         rte_pktmbuf_free(pkt_to_free);
412                                 }
413                         }
414                         lp->tx.mbuf_out[port].n_mbufs = 0;
415                         lp->tx.mbuf_out_flush[port] = 0;
416                 }
417         }
418 }
419
420 static inline void
421 app_lcore_io_tx_flush(struct app_lcore_params_io *lp)
422 {
423         uint8_t port;
424
425         for (port = 0; port < lp->tx.n_nic_ports; port ++) {
426                 uint32_t n_pkts;
427
428                 if (likely((lp->tx.mbuf_out_flush[port] == 0) ||
429                            (lp->tx.mbuf_out[port].n_mbufs == 0))) {
430                         lp->tx.mbuf_out_flush[port] = 1;
431                         continue;
432                 }
433
434                 n_pkts = rte_eth_tx_burst(
435                         port,
436                         0,
437                         lp->tx.mbuf_out[port].array,
438                         (uint16_t) lp->tx.mbuf_out[port].n_mbufs);
439
440                 if (unlikely(n_pkts < lp->tx.mbuf_out[port].n_mbufs)) {
441                         uint32_t k;
442                         for (k = n_pkts; k < lp->tx.mbuf_out[port].n_mbufs; k ++) {
443                                 struct rte_mbuf *pkt_to_free = lp->tx.mbuf_out[port].array[k];
444                                 rte_pktmbuf_free(pkt_to_free);
445                         }
446                 }
447
448                 lp->tx.mbuf_out[port].n_mbufs = 0;
449                 lp->tx.mbuf_out_flush[port] = 1;
450         }
451 }
452
453 static void
454 app_lcore_main_loop_io(void)
455 {
456         uint32_t lcore = rte_lcore_id();
457         struct app_lcore_params_io *lp = &app.lcore_params[lcore].io;
458         uint32_t n_workers = app_get_lcores_worker();
459         uint64_t i = 0;
460
461         uint32_t bsz_rx_rd = app.burst_size_io_rx_read;
462         uint32_t bsz_rx_wr = app.burst_size_io_rx_write;
463         uint32_t bsz_tx_rd = app.burst_size_io_tx_read;
464         uint32_t bsz_tx_wr = app.burst_size_io_tx_write;
465
466         uint8_t pos_lb = app.pos_lb;
467
468         for ( ; ; ) {
469                 if (APP_LCORE_IO_FLUSH && (unlikely(i == APP_LCORE_IO_FLUSH))) {
470                         if (likely(lp->rx.n_nic_queues > 0)) {
471                                 app_lcore_io_rx_flush(lp, n_workers);
472                         }
473
474                         if (likely(lp->tx.n_nic_ports > 0)) {
475                                 app_lcore_io_tx_flush(lp);
476                         }
477
478                         i = 0;
479                 }
480
481                 if (likely(lp->rx.n_nic_queues > 0)) {
482                         app_lcore_io_rx(lp, n_workers, bsz_rx_rd, bsz_rx_wr, pos_lb);
483                 }
484
485                 if (likely(lp->tx.n_nic_ports > 0)) {
486                         app_lcore_io_tx(lp, n_workers, bsz_tx_rd, bsz_tx_wr);
487                 }
488
489                 i ++;
490         }
491 }
492
493 static inline void
494 app_lcore_worker(
495         struct app_lcore_params_worker *lp,
496         uint32_t bsz_rd,
497         uint32_t bsz_wr)
498 {
499         uint32_t i;
500
501         for (i = 0; i < lp->n_rings_in; i ++) {
502                 struct rte_ring *ring_in = lp->rings_in[i];
503                 uint32_t j;
504                 int ret;
505
506                 ret = rte_ring_sc_dequeue_bulk(
507                         ring_in,
508                         (void **) lp->mbuf_in.array,
509                         bsz_rd);
510
511                 if (unlikely(ret == -ENOENT)) {
512                         continue;
513                 }
514
515 #if APP_WORKER_DROP_ALL_PACKETS
516                 for (j = 0; j < bsz_rd; j ++) {
517                         struct rte_mbuf *pkt = lp->mbuf_in.array[j];
518                         rte_pktmbuf_free(pkt);
519                 }
520
521                 continue;
522 #endif
523
524                 APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[0], unsigned char *));
525                 APP_WORKER_PREFETCH0(lp->mbuf_in.array[1]);
526
527                 for (j = 0; j < bsz_rd; j ++) {
528                         struct rte_mbuf *pkt;
529                         struct ipv4_hdr *ipv4_hdr;
530                         uint32_t ipv4_dst, pos;
531                         uint8_t port;
532
533                         if (likely(j < bsz_rd - 1)) {
534                                 APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[j+1], unsigned char *));
535                         }
536                         if (likely(j < bsz_rd - 2)) {
537                                 APP_WORKER_PREFETCH0(lp->mbuf_in.array[j+2]);
538                         }
539
540                         pkt = lp->mbuf_in.array[j];
541                         ipv4_hdr = (struct ipv4_hdr *)(rte_pktmbuf_mtod(pkt, unsigned char *) + sizeof(struct ether_hdr));
542                         ipv4_dst = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
543
544                         if (unlikely(rte_lpm_lookup(lp->lpm_table, ipv4_dst, &port) != 0)) {
545                                 port = pkt->pkt.in_port;
546                         }
547
548                         pos = lp->mbuf_out[port].n_mbufs;
549
550                         lp->mbuf_out[port].array[pos ++] = pkt;
551                         if (likely(pos < bsz_wr)) {
552                                 lp->mbuf_out[port].n_mbufs = pos;
553                                 continue;
554                         }
555
556                         ret = rte_ring_sp_enqueue_bulk(
557                                 lp->rings_out[port],
558                                 (void **) lp->mbuf_out[port].array,
559                                 bsz_wr);
560
561 #if APP_STATS
562                         lp->rings_out_iters[port] ++;
563                         if (ret == 0) {
564                                 lp->rings_out_count[port] += 1;
565                         }
566                         if (lp->rings_out_iters[port] == APP_STATS){
567                                 printf("\t\tWorker %u out (NIC port %u): enq success rate = %.2f\n",
568                                         lp->worker_id,
569                                         (uint32_t) port,
570                                         ((double) lp->rings_out_count[port]) / ((double) lp->rings_out_iters[port]));
571                                 lp->rings_out_iters[port] = 0;
572                                 lp->rings_out_count[port] = 0;
573                         }
574 #endif
575
576                         if (unlikely(ret == -ENOBUFS)) {
577                                 uint32_t k;
578                                 for (k = 0; k < bsz_wr; k ++) {
579                                         struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
580                                         rte_pktmbuf_free(pkt_to_free);
581                                 }
582                         }
583
584                         lp->mbuf_out[port].n_mbufs = 0;
585                         lp->mbuf_out_flush[port] = 0;
586                 }
587         }
588 }
589
590 static inline void
591 app_lcore_worker_flush(struct app_lcore_params_worker *lp)
592 {
593         uint32_t port;
594
595         for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
596                 int ret;
597
598                 if (unlikely(lp->rings_out[port] == NULL)) {
599                         continue;
600                 }
601
602                 if (likely((lp->mbuf_out_flush[port] == 0) ||
603                            (lp->mbuf_out[port].n_mbufs == 0))) {
604                         lp->mbuf_out_flush[port] = 1;
605                         continue;
606                 }
607
608                 ret = rte_ring_sp_enqueue_bulk(
609                         lp->rings_out[port],
610                         (void **) lp->mbuf_out[port].array,
611                         lp->mbuf_out[port].n_mbufs);
612
613                 if (unlikely(ret < 0)) {
614                         uint32_t k;
615                         for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) {
616                                 struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
617                                 rte_pktmbuf_free(pkt_to_free);
618                         }
619                 }
620
621                 lp->mbuf_out[port].n_mbufs = 0;
622                 lp->mbuf_out_flush[port] = 1;
623         }
624 }
625
626 static void
627 app_lcore_main_loop_worker(void) {
628         uint32_t lcore = rte_lcore_id();
629         struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker;
630         uint64_t i = 0;
631
632         uint32_t bsz_rd = app.burst_size_worker_read;
633         uint32_t bsz_wr = app.burst_size_worker_write;
634
635         for ( ; ; ) {
636                 if (APP_LCORE_WORKER_FLUSH && (unlikely(i == APP_LCORE_WORKER_FLUSH))) {
637                         app_lcore_worker_flush(lp);
638                         i = 0;
639                 }
640
641                 app_lcore_worker(lp, bsz_rd, bsz_wr);
642
643                 i ++;
644         }
645 }
646
647 int
648 app_lcore_main_loop(__attribute__((unused)) void *arg)
649 {
650         struct app_lcore_params *lp;
651         uint32_t lcore;
652
653         lcore = rte_lcore_id();
654         lp = &app.lcore_params[lcore];
655
656         if (lp->type == e_APP_LCORE_IO) {
657                 printf("Logical core %u (I/O) main loop.\n", lcore);
658                 app_lcore_main_loop_io();
659         }
660
661         if (lp->type == e_APP_LCORE_WORKER) {
662                 printf("Logical core %u (worker %u) main loop.\n",
663                         lcore,
664                         lp->worker.worker_id);
665                 app_lcore_main_loop_worker();
666         }
667
668         return 0;
669 }