examples/ip_pipeline: add master pipeline
[dpdk.git] / examples / ip_pipeline / init.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <inttypes.h>
35 #include <stdio.h>
36 #include <string.h>
37
38 #include <rte_cycles.h>
39 #include <rte_ethdev.h>
40 #include <rte_ether.h>
41 #include <rte_ip.h>
42 #include <rte_eal.h>
43 #include <rte_malloc.h>
44
45 #include "app.h"
46 #include "pipeline.h"
47 #include "pipeline_common_fe.h"
48 #include "pipeline_master.h"
49
50 #define APP_NAME_SIZE   32
51
52 static void
53 app_init_core_map(struct app_params *app)
54 {
55         APP_LOG(app, HIGH, "Initializing CPU core map ...");
56         app->core_map = cpu_core_map_init(4, 32, 4, 0);
57
58         if (app->core_map == NULL)
59                 rte_panic("Cannot create CPU core map\n");
60
61         if (app->log_level >= APP_LOG_LEVEL_LOW)
62                 cpu_core_map_print(app->core_map);
63 }
64
65 static void
66 app_init_core_mask(struct app_params *app)
67 {
68         uint64_t mask = 0;
69         uint32_t i;
70
71         for (i = 0; i < app->n_pipelines; i++) {
72                 struct app_pipeline_params *p = &app->pipeline_params[i];
73                 int lcore_id;
74
75                 lcore_id = cpu_core_map_get_lcore_id(app->core_map,
76                         p->socket_id,
77                         p->core_id,
78                         p->hyper_th_id);
79
80                 if (lcore_id < 0)
81                         rte_panic("Cannot create CPU core mask\n");
82
83                 mask |= 1LLU << lcore_id;
84         }
85
86         app->core_mask = mask;
87         APP_LOG(app, HIGH, "CPU core mask = 0x%016" PRIx64, app->core_mask);
88 }
89
90 static void
91 app_init_eal(struct app_params *app)
92 {
93         char buffer[32];
94         struct app_eal_params *p = &app->eal_params;
95         uint32_t n_args = 0;
96         int status;
97
98         app->eal_argv[n_args++] = strdup(app->app_name);
99
100         snprintf(buffer, sizeof(buffer), "-c%" PRIx64, app->core_mask);
101         app->eal_argv[n_args++] = strdup(buffer);
102
103         if (p->coremap) {
104                 snprintf(buffer, sizeof(buffer), "--lcores=%s", p->coremap);
105                 app->eal_argv[n_args++] = strdup(buffer);
106         }
107
108         if (p->master_lcore_present) {
109                 snprintf(buffer,
110                         sizeof(buffer),
111                         "--master-lcore=%" PRIu32,
112                         p->master_lcore);
113                 app->eal_argv[n_args++] = strdup(buffer);
114         }
115
116         snprintf(buffer, sizeof(buffer), "-n%" PRIu32, p->channels);
117         app->eal_argv[n_args++] = strdup(buffer);
118
119         if (p->memory_present) {
120                 snprintf(buffer, sizeof(buffer), "-m%" PRIu32, p->memory);
121                 app->eal_argv[n_args++] = strdup(buffer);
122         }
123
124         if (p->ranks_present) {
125                 snprintf(buffer, sizeof(buffer), "-r%" PRIu32, p->ranks);
126                 app->eal_argv[n_args++] = strdup(buffer);
127         }
128
129         if (p->pci_blacklist) {
130                 snprintf(buffer,
131                         sizeof(buffer),
132                         "--pci-blacklist=%s",
133                         p->pci_blacklist);
134                 app->eal_argv[n_args++] = strdup(buffer);
135         }
136
137         if (p->pci_whitelist) {
138                 snprintf(buffer,
139                         sizeof(buffer),
140                         "--pci-whitelist=%s",
141                         p->pci_whitelist);
142                 app->eal_argv[n_args++] = strdup(buffer);
143         }
144
145         if (p->vdev) {
146                 snprintf(buffer, sizeof(buffer), "--vdev=%s", p->vdev);
147                 app->eal_argv[n_args++] = strdup(buffer);
148         }
149
150         if ((p->vmware_tsc_map_present) && p->vmware_tsc_map) {
151                 snprintf(buffer, sizeof(buffer), "--vmware-tsc-map");
152                 app->eal_argv[n_args++] = strdup(buffer);
153         }
154
155         if (p->proc_type) {
156                 snprintf(buffer,
157                         sizeof(buffer),
158                         "--proc-type=%s",
159                         p->proc_type);
160                 app->eal_argv[n_args++] = strdup(buffer);
161         }
162
163         if (p->syslog) {
164                 snprintf(buffer, sizeof(buffer), "--syslog=%s", p->syslog);
165                 app->eal_argv[n_args++] = strdup(buffer);
166         }
167
168         if (p->log_level_present) {
169                 snprintf(buffer,
170                         sizeof(buffer),
171                         "--log-level=%" PRIu32,
172                         p->log_level);
173                 app->eal_argv[n_args++] = strdup(buffer);
174         }
175
176         if ((p->version_present) && p->version) {
177                 snprintf(buffer, sizeof(buffer), "-v");
178                 app->eal_argv[n_args++] = strdup(buffer);
179         }
180
181         if ((p->help_present) && p->help) {
182                 snprintf(buffer, sizeof(buffer), "--help");
183                 app->eal_argv[n_args++] = strdup(buffer);
184         }
185
186         if ((p->no_huge_present) && p->no_huge) {
187                 snprintf(buffer, sizeof(buffer), "--no-huge");
188                 app->eal_argv[n_args++] = strdup(buffer);
189         }
190
191         if ((p->no_pci_present) && p->no_pci) {
192                 snprintf(buffer, sizeof(buffer), "--no-pci");
193                 app->eal_argv[n_args++] = strdup(buffer);
194         }
195
196         if ((p->no_hpet_present) && p->no_hpet) {
197                 snprintf(buffer, sizeof(buffer), "--no-hpet");
198                 app->eal_argv[n_args++] = strdup(buffer);
199         }
200
201         if ((p->no_shconf_present) && p->no_shconf) {
202                 snprintf(buffer, sizeof(buffer), "--no-shconf");
203                 app->eal_argv[n_args++] = strdup(buffer);
204         }
205
206         if (p->add_driver) {
207                 snprintf(buffer, sizeof(buffer), "-d=%s", p->add_driver);
208                 app->eal_argv[n_args++] = strdup(buffer);
209         }
210
211         if (p->socket_mem) {
212                 snprintf(buffer,
213                         sizeof(buffer),
214                         "--socket-mem=%s",
215                         p->socket_mem);
216                 app->eal_argv[n_args++] = strdup(buffer);
217         }
218
219         if (p->huge_dir) {
220                 snprintf(buffer, sizeof(buffer), "--huge-dir=%s", p->huge_dir);
221                 app->eal_argv[n_args++] = strdup(buffer);
222         }
223
224         if (p->file_prefix) {
225                 snprintf(buffer,
226                         sizeof(buffer),
227                         "--file-prefix=%s",
228                         p->file_prefix);
229                 app->eal_argv[n_args++] = strdup(buffer);
230         }
231
232         if (p->base_virtaddr) {
233                 snprintf(buffer,
234                         sizeof(buffer),
235                         "--base-virtaddr=%s",
236                         p->base_virtaddr);
237                 app->eal_argv[n_args++] = strdup(buffer);
238         }
239
240         if ((p->create_uio_dev_present) && p->create_uio_dev) {
241                 snprintf(buffer, sizeof(buffer), "--create-uio-dev");
242                 app->eal_argv[n_args++] = strdup(buffer);
243         }
244
245         if (p->vfio_intr) {
246                 snprintf(buffer,
247                         sizeof(buffer),
248                         "--vfio-intr=%s",
249                         p->vfio_intr);
250                 app->eal_argv[n_args++] = strdup(buffer);
251         }
252
253         if ((p->xen_dom0_present) && (p->xen_dom0)) {
254                 snprintf(buffer, sizeof(buffer), "--xen-dom0");
255                 app->eal_argv[n_args++] = strdup(buffer);
256         }
257
258         snprintf(buffer, sizeof(buffer), "--");
259         app->eal_argv[n_args++] = strdup(buffer);
260
261         app->eal_argc = n_args;
262
263         APP_LOG(app, HIGH, "Initializing EAL ...");
264         status = rte_eal_init(app->eal_argc, app->eal_argv);
265         if (status < 0)
266                 rte_panic("EAL init error\n");
267 }
268
269 static void
270 app_init_mempool(struct app_params *app)
271 {
272         uint32_t i;
273
274         for (i = 0; i < app->n_mempools; i++) {
275                 struct app_mempool_params *p = &app->mempool_params[i];
276
277                 APP_LOG(app, HIGH, "Initializing %s ...", p->name);
278                 app->mempool[i] = rte_mempool_create(
279                                 p->name,
280                                 p->pool_size,
281                                 p->buffer_size,
282                                 p->cache_size,
283                                 sizeof(struct rte_pktmbuf_pool_private),
284                                 rte_pktmbuf_pool_init, NULL,
285                                 rte_pktmbuf_init, NULL,
286                                 p->cpu_socket_id,
287                                 0);
288
289                 if (app->mempool[i] == NULL)
290                         rte_panic("%s init error\n", p->name);
291         }
292 }
293
294 static inline int
295 app_link_filter_arp_add(struct app_link_params *link)
296 {
297         struct rte_eth_ethertype_filter filter = {
298                 .ether_type = ETHER_TYPE_ARP,
299                 .flags = 0,
300                 .queue = link->arp_q,
301         };
302
303         return rte_eth_dev_filter_ctrl(link->pmd_id,
304                 RTE_ETH_FILTER_ETHERTYPE,
305                 RTE_ETH_FILTER_ADD,
306                 &filter);
307 }
308
309 static inline int
310 app_link_filter_tcp_syn_add(struct app_link_params *link)
311 {
312         struct rte_eth_syn_filter filter = {
313                 .hig_pri = 1,
314                 .queue = link->tcp_syn_local_q,
315         };
316
317         return rte_eth_dev_filter_ctrl(link->pmd_id,
318                 RTE_ETH_FILTER_SYN,
319                 RTE_ETH_FILTER_ADD,
320                 &filter);
321 }
322
323 static inline int
324 app_link_filter_ip_add(struct app_link_params *l1, struct app_link_params *l2)
325 {
326         struct rte_eth_ntuple_filter filter = {
327                 .flags = RTE_5TUPLE_FLAGS,
328                 .dst_ip = rte_bswap32(l2->ip),
329                 .dst_ip_mask = UINT32_MAX, /* Enable */
330                 .src_ip = 0,
331                 .src_ip_mask = 0, /* Disable */
332                 .dst_port = 0,
333                 .dst_port_mask = 0, /* Disable */
334                 .src_port = 0,
335                 .src_port_mask = 0, /* Disable */
336                 .proto = 0,
337                 .proto_mask = 0, /* Disable */
338                 .tcp_flags = 0,
339                 .priority = 1, /* Lowest */
340                 .queue = l1->ip_local_q,
341         };
342
343         return rte_eth_dev_filter_ctrl(l1->pmd_id,
344                 RTE_ETH_FILTER_NTUPLE,
345                 RTE_ETH_FILTER_ADD,
346                 &filter);
347 }
348
349 static inline int
350 app_link_filter_ip_del(struct app_link_params *l1, struct app_link_params *l2)
351 {
352         struct rte_eth_ntuple_filter filter = {
353                 .flags = RTE_5TUPLE_FLAGS,
354                 .dst_ip = rte_bswap32(l2->ip),
355                 .dst_ip_mask = UINT32_MAX, /* Enable */
356                 .src_ip = 0,
357                 .src_ip_mask = 0, /* Disable */
358                 .dst_port = 0,
359                 .dst_port_mask = 0, /* Disable */
360                 .src_port = 0,
361                 .src_port_mask = 0, /* Disable */
362                 .proto = 0,
363                 .proto_mask = 0, /* Disable */
364                 .tcp_flags = 0,
365                 .priority = 1, /* Lowest */
366                 .queue = l1->ip_local_q,
367         };
368
369         return rte_eth_dev_filter_ctrl(l1->pmd_id,
370                 RTE_ETH_FILTER_NTUPLE,
371                 RTE_ETH_FILTER_DELETE,
372                 &filter);
373 }
374
375 static inline int
376 app_link_filter_tcp_add(struct app_link_params *l1, struct app_link_params *l2)
377 {
378         struct rte_eth_ntuple_filter filter = {
379                 .flags = RTE_5TUPLE_FLAGS,
380                 .dst_ip = rte_bswap32(l2->ip),
381                 .dst_ip_mask = UINT32_MAX, /* Enable */
382                 .src_ip = 0,
383                 .src_ip_mask = 0, /* Disable */
384                 .dst_port = 0,
385                 .dst_port_mask = 0, /* Disable */
386                 .src_port = 0,
387                 .src_port_mask = 0, /* Disable */
388                 .proto = IPPROTO_TCP,
389                 .proto_mask = UINT8_MAX, /* Enable */
390                 .tcp_flags = 0,
391                 .priority = 2, /* Higher priority than IP */
392                 .queue = l1->tcp_local_q,
393         };
394
395         return rte_eth_dev_filter_ctrl(l1->pmd_id,
396                 RTE_ETH_FILTER_NTUPLE,
397                 RTE_ETH_FILTER_ADD,
398                 &filter);
399 }
400
401 static inline int
402 app_link_filter_tcp_del(struct app_link_params *l1, struct app_link_params *l2)
403 {
404         struct rte_eth_ntuple_filter filter = {
405                 .flags = RTE_5TUPLE_FLAGS,
406                 .dst_ip = rte_bswap32(l2->ip),
407                 .dst_ip_mask = UINT32_MAX, /* Enable */
408                 .src_ip = 0,
409                 .src_ip_mask = 0, /* Disable */
410                 .dst_port = 0,
411                 .dst_port_mask = 0, /* Disable */
412                 .src_port = 0,
413                 .src_port_mask = 0, /* Disable */
414                 .proto = IPPROTO_TCP,
415                 .proto_mask = UINT8_MAX, /* Enable */
416                 .tcp_flags = 0,
417                 .priority = 2, /* Higher priority than IP */
418                 .queue = l1->tcp_local_q,
419         };
420
421         return rte_eth_dev_filter_ctrl(l1->pmd_id,
422                 RTE_ETH_FILTER_NTUPLE,
423                 RTE_ETH_FILTER_DELETE,
424                 &filter);
425 }
426
427 static inline int
428 app_link_filter_udp_add(struct app_link_params *l1, struct app_link_params *l2)
429 {
430         struct rte_eth_ntuple_filter filter = {
431                 .flags = RTE_5TUPLE_FLAGS,
432                 .dst_ip = rte_bswap32(l2->ip),
433                 .dst_ip_mask = UINT32_MAX, /* Enable */
434                 .src_ip = 0,
435                 .src_ip_mask = 0, /* Disable */
436                 .dst_port = 0,
437                 .dst_port_mask = 0, /* Disable */
438                 .src_port = 0,
439                 .src_port_mask = 0, /* Disable */
440                 .proto = IPPROTO_UDP,
441                 .proto_mask = UINT8_MAX, /* Enable */
442                 .tcp_flags = 0,
443                 .priority = 2, /* Higher priority than IP */
444                 .queue = l1->udp_local_q,
445         };
446
447         return rte_eth_dev_filter_ctrl(l1->pmd_id,
448                 RTE_ETH_FILTER_NTUPLE,
449                 RTE_ETH_FILTER_ADD,
450                 &filter);
451 }
452
453 static inline int
454 app_link_filter_udp_del(struct app_link_params *l1, struct app_link_params *l2)
455 {
456         struct rte_eth_ntuple_filter filter = {
457                 .flags = RTE_5TUPLE_FLAGS,
458                 .dst_ip = rte_bswap32(l2->ip),
459                 .dst_ip_mask = UINT32_MAX, /* Enable */
460                 .src_ip = 0,
461                 .src_ip_mask = 0, /* Disable */
462                 .dst_port = 0,
463                 .dst_port_mask = 0, /* Disable */
464                 .src_port = 0,
465                 .src_port_mask = 0, /* Disable */
466                 .proto = IPPROTO_UDP,
467                 .proto_mask = UINT8_MAX, /* Enable */
468                 .tcp_flags = 0,
469                 .priority = 2, /* Higher priority than IP */
470                 .queue = l1->udp_local_q,
471         };
472
473         return rte_eth_dev_filter_ctrl(l1->pmd_id,
474                 RTE_ETH_FILTER_NTUPLE,
475                 RTE_ETH_FILTER_DELETE,
476                 &filter);
477 }
478
479 static inline int
480 app_link_filter_sctp_add(struct app_link_params *l1, struct app_link_params *l2)
481 {
482         struct rte_eth_ntuple_filter filter = {
483                 .flags = RTE_5TUPLE_FLAGS,
484                 .dst_ip = rte_bswap32(l2->ip),
485                 .dst_ip_mask = UINT32_MAX, /* Enable */
486                 .src_ip = 0,
487                 .src_ip_mask = 0, /* Disable */
488                 .dst_port = 0,
489                 .dst_port_mask = 0, /* Disable */
490                 .src_port = 0,
491                 .src_port_mask = 0, /* Disable */
492                 .proto = IPPROTO_SCTP,
493                 .proto_mask = UINT8_MAX, /* Enable */
494                 .tcp_flags = 0,
495                 .priority = 2, /* Higher priority than IP */
496                 .queue = l1->sctp_local_q,
497         };
498
499         return rte_eth_dev_filter_ctrl(l1->pmd_id,
500                 RTE_ETH_FILTER_NTUPLE,
501                 RTE_ETH_FILTER_ADD,
502                 &filter);
503 }
504
505 static inline int
506 app_link_filter_sctp_del(struct app_link_params *l1, struct app_link_params *l2)
507 {
508         struct rte_eth_ntuple_filter filter = {
509                 .flags = RTE_5TUPLE_FLAGS,
510                 .dst_ip = rte_bswap32(l2->ip),
511                 .dst_ip_mask = UINT32_MAX, /* Enable */
512                 .src_ip = 0,
513                 .src_ip_mask = 0, /* Disable */
514                 .dst_port = 0,
515                 .dst_port_mask = 0, /* Disable */
516                 .src_port = 0,
517                 .src_port_mask = 0, /* Disable */
518                 .proto = IPPROTO_SCTP,
519                 .proto_mask = UINT8_MAX, /* Enable */
520                 .tcp_flags = 0,
521                 .priority = 2, /* Higher priority than IP */
522                 .queue = l1->sctp_local_q,
523         };
524
525         return rte_eth_dev_filter_ctrl(l1->pmd_id,
526                 RTE_ETH_FILTER_NTUPLE,
527                 RTE_ETH_FILTER_DELETE,
528                 &filter);
529 }
530
531 static void
532 app_link_set_arp_filter(struct app_params *app, struct app_link_params *cp)
533 {
534         if (cp->arp_q != 0) {
535                 int status = app_link_filter_arp_add(cp);
536
537                 APP_LOG(app, LOW, "%s (%" PRIu32 "): "
538                         "Adding ARP filter (queue = %" PRIu32 ")",
539                         cp->name, cp->pmd_id, cp->arp_q);
540
541                 if (status)
542                         rte_panic("%s (%" PRIu32 "): "
543                                 "Error adding ARP filter "
544                                 "(queue = %" PRIu32 ") (%" PRId32 ")\n",
545                                 cp->name, cp->pmd_id, cp->arp_q, status);
546         }
547 }
548
549 static void
550 app_link_set_tcp_syn_filter(struct app_params *app, struct app_link_params *cp)
551 {
552         if (cp->tcp_syn_local_q != 0) {
553                 int status = app_link_filter_tcp_syn_add(cp);
554
555                 APP_LOG(app, LOW, "%s (%" PRIu32 "): "
556                         "Adding TCP SYN filter (queue = %" PRIu32 ")",
557                         cp->name, cp->pmd_id, cp->tcp_syn_local_q);
558
559                 if (status)
560                         rte_panic("%s (%" PRIu32 "): "
561                                 "Error adding TCP SYN filter "
562                                 "(queue = %" PRIu32 ") (%" PRId32 ")\n",
563                                 cp->name, cp->pmd_id, cp->tcp_syn_local_q,
564                                 status);
565         }
566 }
567
568 void
569 app_link_up_internal(struct app_params *app, struct app_link_params *cp)
570 {
571         uint32_t i;
572         int status;
573
574         /* For each link, add filters for IP of current link */
575         if (cp->ip != 0) {
576                 for (i = 0; i < app->n_links; i++) {
577                         struct app_link_params *p = &app->link_params[i];
578
579                         /* IP */
580                         if (p->ip_local_q != 0) {
581                                 int status = app_link_filter_ip_add(p, cp);
582
583                                 APP_LOG(app, LOW, "%s (%" PRIu32 "): "
584                                         "Adding IP filter (queue= %" PRIu32
585                                         ", IP = 0x%08" PRIx32 ")",
586                                         p->name, p->pmd_id, p->ip_local_q,
587                                         cp->ip);
588
589                                 if (status)
590                                         rte_panic("%s (%" PRIu32 "): "
591                                                 "Error adding IP "
592                                                 "filter (queue= %" PRIu32 ", "
593                                                 "IP = 0x%08" PRIx32
594                                                 ") (%" PRId32 ")\n",
595                                                 p->name, p->pmd_id,
596                                                 p->ip_local_q, cp->ip, status);
597                         }
598
599                         /* TCP */
600                         if (p->tcp_local_q != 0) {
601                                 int status = app_link_filter_tcp_add(p, cp);
602
603                                 APP_LOG(app, LOW, "%s (%" PRIu32 "): "
604                                         "Adding TCP filter "
605                                         "(queue = %" PRIu32
606                                         ", IP = 0x%08" PRIx32 ")",
607                                         p->name, p->pmd_id, p->tcp_local_q,
608                                         cp->ip);
609
610                                 if (status)
611                                         rte_panic("%s (%" PRIu32 "): "
612                                                 "Error adding TCP "
613                                                 "filter (queue = %" PRIu32 ", "
614                                                 "IP = 0x%08" PRIx32
615                                                 ") (%" PRId32 ")\n",
616                                                 p->name, p->pmd_id,
617                                                 p->tcp_local_q, cp->ip, status);
618                         }
619
620                         /* UDP */
621                         if (p->udp_local_q != 0) {
622                                 int status = app_link_filter_udp_add(p, cp);
623
624                                 APP_LOG(app, LOW, "%s (%" PRIu32 "): "
625                                         "Adding UDP filter "
626                                         "(queue = %" PRIu32
627                                         ", IP = 0x%08" PRIx32 ")",
628                                         p->name, p->pmd_id, p->udp_local_q,
629                                         cp->ip);
630
631                                 if (status)
632                                         rte_panic("%s (%" PRIu32 "): "
633                                                 "Error adding UDP "
634                                                 "filter (queue = %" PRIu32 ", "
635                                                 "IP = 0x%08" PRIx32
636                                                 ") (%" PRId32 ")\n",
637                                                 p->name, p->pmd_id,
638                                                 p->udp_local_q, cp->ip, status);
639                         }
640
641                         /* SCTP */
642                         if (p->sctp_local_q != 0) {
643                                 int status = app_link_filter_sctp_add(p, cp);
644
645                                 APP_LOG(app, LOW, "%s (%" PRIu32
646                                         "): Adding SCTP filter "
647                                         "(queue = %" PRIu32
648                                         ", IP = 0x%08" PRIx32 ")",
649                                         p->name, p->pmd_id, p->sctp_local_q,
650                                         cp->ip);
651
652                                 if (status)
653                                         rte_panic("%s (%" PRIu32 "): "
654                                                 "Error adding SCTP "
655                                                 "filter (queue = %" PRIu32 ", "
656                                                 "IP = 0x%08" PRIx32
657                                                 ") (%" PRId32 ")\n",
658                                                 p->name, p->pmd_id,
659                                                 p->sctp_local_q, cp->ip,
660                                                 status);
661                         }
662                 }
663         }
664
665         /* PMD link up */
666         status = rte_eth_dev_set_link_up(cp->pmd_id);
667         if (status < 0)
668                 rte_panic("%s (%" PRIu32 "): PMD set up error %" PRId32 "\n",
669                         cp->name, cp->pmd_id, status);
670
671         /* Mark link as UP */
672         cp->state = 1;
673 }
674
675 void
676 app_link_down_internal(struct app_params *app, struct app_link_params *cp)
677 {
678         uint32_t i;
679
680         /* PMD link down */
681         rte_eth_dev_set_link_down(cp->pmd_id);
682
683         /* Mark link as DOWN */
684         cp->state = 0;
685
686         /* Return if current link IP is not valid */
687         if (cp->ip == 0)
688                 return;
689
690         /* For each link, remove filters for IP of current link */
691         for (i = 0; i < app->n_links; i++) {
692                 struct app_link_params *p = &app->link_params[i];
693
694                 /* IP */
695                 if (p->ip_local_q != 0) {
696                         int status = app_link_filter_ip_del(p, cp);
697
698                         APP_LOG(app, LOW, "%s (%" PRIu32
699                                 "): Deleting IP filter "
700                                 "(queue = %" PRIu32 ", IP = 0x%" PRIx32 ")",
701                                 p->name, p->pmd_id, p->ip_local_q, cp->ip);
702
703                         if (status)
704                                 rte_panic("%s (%" PRIu32
705                                         "): Error deleting IP filter "
706                                         "(queue = %" PRIu32
707                                         ", IP = 0x%" PRIx32
708                                         ") (%" PRId32 ")\n",
709                                         p->name, p->pmd_id, p->ip_local_q,
710                                         cp->ip, status);
711                 }
712
713                 /* TCP */
714                 if (p->tcp_local_q != 0) {
715                         int status = app_link_filter_tcp_del(p, cp);
716
717                         APP_LOG(app, LOW, "%s (%" PRIu32
718                                 "): Deleting TCP filter "
719                                 "(queue = %" PRIu32
720                                 ", IP = 0x%" PRIx32 ")",
721                                 p->name, p->pmd_id, p->tcp_local_q, cp->ip);
722
723                         if (status)
724                                 rte_panic("%s (%" PRIu32
725                                         "): Error deleting TCP filter "
726                                         "(queue = %" PRIu32
727                                         ", IP = 0x%" PRIx32
728                                         ") (%" PRId32 ")\n",
729                                         p->name, p->pmd_id, p->tcp_local_q,
730                                         cp->ip, status);
731                 }
732
733                 /* UDP */
734                 if (p->udp_local_q != 0) {
735                         int status = app_link_filter_udp_del(p, cp);
736
737                         APP_LOG(app, LOW, "%s (%" PRIu32
738                                 "): Deleting UDP filter "
739                                 "(queue = %" PRIu32 ", IP = 0x%" PRIx32 ")",
740                                 p->name, p->pmd_id, p->udp_local_q, cp->ip);
741
742                         if (status)
743                                 rte_panic("%s (%" PRIu32
744                                         "): Error deleting UDP filter "
745                                         "(queue = %" PRIu32
746                                         ", IP = 0x%" PRIx32
747                                         ") (%" PRId32 ")\n",
748                                         p->name, p->pmd_id, p->udp_local_q,
749                                         cp->ip, status);
750                 }
751
752                 /* SCTP */
753                 if (p->sctp_local_q != 0) {
754                         int status = app_link_filter_sctp_del(p, cp);
755
756                         APP_LOG(app, LOW, "%s (%" PRIu32
757                                 "): Deleting SCTP filter "
758                                 "(queue = %" PRIu32
759                                 ", IP = 0x%" PRIx32 ")",
760                                 p->name, p->pmd_id, p->sctp_local_q, cp->ip);
761
762                         if (status)
763                                 rte_panic("%s (%" PRIu32
764                                         "): Error deleting SCTP filter "
765                                         "(queue = %" PRIu32
766                                         ", IP = 0x%" PRIx32
767                                         ") (%" PRId32 ")\n",
768                                         p->name, p->pmd_id, p->sctp_local_q,
769                                         cp->ip, status);
770                 }
771         }
772 }
773
774 static void
775 app_check_link(struct app_params *app)
776 {
777         uint32_t all_links_up, i;
778
779         all_links_up = 1;
780
781         for (i = 0; i < app->n_links; i++) {
782                 struct app_link_params *p = &app->link_params[i];
783                 struct rte_eth_link link_params;
784
785                 memset(&link_params, 0, sizeof(link_params));
786                 rte_eth_link_get(p->pmd_id, &link_params);
787
788                 APP_LOG(app, HIGH, "%s (%" PRIu32 ") (%" PRIu32 " Gbps) %s",
789                         p->name,
790                         p->pmd_id,
791                         link_params.link_speed / 1000,
792                         link_params.link_status ? "UP" : "DOWN");
793
794                 if (link_params.link_status == 0)
795                         all_links_up = 0;
796         }
797
798         if (all_links_up == 0)
799                 rte_panic("Some links are DOWN\n");
800 }
801
802 static void
803 app_init_link(struct app_params *app)
804 {
805         uint32_t i;
806
807         for (i = 0; i < app->n_links; i++) {
808                 struct app_link_params *p_link = &app->link_params[i];
809                 uint32_t link_id, n_hwq_in, n_hwq_out, j;
810                 int status;
811
812                 sscanf(p_link->name, "LINK%" PRIu32, &link_id);
813                 n_hwq_in = app_link_get_n_rxq(app, p_link);
814                 n_hwq_out = app_link_get_n_txq(app, p_link);
815
816                 APP_LOG(app, HIGH, "Initializing %s (%" PRIu32") "
817                         "(%" PRIu32 " RXQ, %" PRIu32 " TXQ) ...",
818                         p_link->name,
819                         p_link->pmd_id,
820                         n_hwq_in,
821                         n_hwq_out);
822
823                 /* LINK */
824                 status = rte_eth_dev_configure(
825                         p_link->pmd_id,
826                         n_hwq_in,
827                         n_hwq_out,
828                         &p_link->conf);
829                 if (status < 0)
830                         rte_panic("%s (%" PRId32 "): "
831                                 "init error (%" PRId32 ")\n",
832                                 p_link->name, p_link->pmd_id, status);
833
834                 rte_eth_macaddr_get(p_link->pmd_id,
835                         (struct ether_addr *) &p_link->mac_addr);
836
837                 if (p_link->promisc)
838                         rte_eth_promiscuous_enable(p_link->pmd_id);
839
840                 /* RXQ */
841                 for (j = 0; j < app->n_pktq_hwq_in; j++) {
842                         struct app_pktq_hwq_in_params *p_rxq =
843                                 &app->hwq_in_params[j];
844                         uint32_t rxq_link_id, rxq_queue_id;
845
846                         sscanf(p_rxq->name, "RXQ%" PRIu32 ".%" PRIu32,
847                                 &rxq_link_id, &rxq_queue_id);
848                         if (rxq_link_id != link_id)
849                                 continue;
850
851                         status = rte_eth_rx_queue_setup(
852                                 p_link->pmd_id,
853                                 rxq_queue_id,
854                                 p_rxq->size,
855                                 rte_eth_dev_socket_id(p_link->pmd_id),
856                                 &p_rxq->conf,
857                                 app->mempool[p_rxq->mempool_id]);
858                         if (status < 0)
859                                 rte_panic("%s (%" PRIu32 "): "
860                                         "%s init error (%" PRId32 ")\n",
861                                         p_link->name,
862                                         p_link->pmd_id,
863                                         p_rxq->name,
864                                         status);
865                 }
866
867                 /* TXQ */
868                 for (j = 0; j < app->n_pktq_hwq_out; j++) {
869                         struct app_pktq_hwq_out_params *p_txq =
870                                 &app->hwq_out_params[j];
871                         uint32_t txq_link_id, txq_queue_id;
872
873                         sscanf(p_txq->name, "TXQ%" PRIu32 ".%" PRIu32,
874                                 &txq_link_id, &txq_queue_id);
875                         if (txq_link_id != link_id)
876                                 continue;
877
878                         status = rte_eth_tx_queue_setup(
879                                 p_link->pmd_id,
880                                 txq_queue_id,
881                                 p_txq->size,
882                                 rte_eth_dev_socket_id(p_link->pmd_id),
883                                 &p_txq->conf);
884                         if (status < 0)
885                                 rte_panic("%s (%" PRIu32 "): "
886                                         "%s init error (%" PRId32 ")\n",
887                                         p_link->name,
888                                         p_link->pmd_id,
889                                         p_txq->name,
890                                         status);
891                 }
892
893                 /* LINK START */
894                 status = rte_eth_dev_start(p_link->pmd_id);
895                 if (status < 0)
896                         rte_panic("Cannot start %s (error %" PRId32 ")\n",
897                                 p_link->name, status);
898
899                 /* LINK UP */
900                 app_link_set_arp_filter(app, p_link);
901                 app_link_set_tcp_syn_filter(app, p_link);
902                 app_link_up_internal(app, p_link);
903         }
904
905         app_check_link(app);
906 }
907
908 static void
909 app_init_swq(struct app_params *app)
910 {
911         uint32_t i;
912
913         for (i = 0; i < app->n_pktq_swq; i++) {
914                 struct app_pktq_swq_params *p = &app->swq_params[i];
915
916                 APP_LOG(app, HIGH, "Initializing %s...", p->name);
917                 app->swq[i] = rte_ring_create(
918                                 p->name,
919                                 p->size,
920                                 p->cpu_socket_id,
921                                 RING_F_SP_ENQ | RING_F_SC_DEQ);
922
923                 if (app->swq[i] == NULL)
924                         rte_panic("%s init error\n", p->name);
925         }
926 }
927
928 static void
929 app_init_tm(struct app_params *app)
930 {
931         uint32_t i;
932
933         for (i = 0; i < app->n_pktq_tm; i++) {
934                 struct app_pktq_tm_params *p_tm = &app->tm_params[i];
935                 struct app_link_params *p_link;
936                 struct rte_eth_link link_eth_params;
937                 struct rte_sched_port *sched;
938                 uint32_t n_subports, subport_id;
939                 int status;
940
941                 p_link = app_get_link_for_tm(app, p_tm);
942                 /* LINK */
943                 rte_eth_link_get(p_link->pmd_id, &link_eth_params);
944
945                 /* TM */
946                 p_tm->sched_port_params.name = p_tm->name;
947                 p_tm->sched_port_params.socket =
948                         rte_eth_dev_socket_id(p_link->pmd_id);
949                 p_tm->sched_port_params.rate =
950                         (uint64_t) link_eth_params.link_speed * 1000 * 1000 / 8;
951
952                 APP_LOG(app, HIGH, "Initializing %s ...", p_tm->name);
953                 sched = rte_sched_port_config(&p_tm->sched_port_params);
954                 if (sched == NULL)
955                         rte_panic("%s init error\n", p_tm->name);
956                 app->tm[i] = sched;
957
958                 /* Subport */
959                 n_subports = p_tm->sched_port_params.n_subports_per_port;
960                 for (subport_id = 0; subport_id < n_subports; subport_id++) {
961                         uint32_t n_pipes_per_subport, pipe_id;
962
963                         status = rte_sched_subport_config(sched,
964                                 subport_id,
965                                 &p_tm->sched_subport_params[subport_id]);
966                         if (status)
967                                 rte_panic("%s subport %" PRIu32
968                                         " init error (%" PRId32 ")\n",
969                                         p_tm->name, subport_id, status);
970
971                         /* Pipe */
972                         n_pipes_per_subport =
973                                 p_tm->sched_port_params.n_pipes_per_subport;
974                         for (pipe_id = 0;
975                                 pipe_id < n_pipes_per_subport;
976                                 pipe_id++) {
977                                 int profile_id = p_tm->sched_pipe_to_profile[
978                                         subport_id * APP_MAX_SCHED_PIPES +
979                                         pipe_id];
980
981                                 if (profile_id == -1)
982                                         continue;
983
984                                 status = rte_sched_pipe_config(sched,
985                                         subport_id,
986                                         pipe_id,
987                                         profile_id);
988                                 if (status)
989                                         rte_panic("%s subport %" PRIu32
990                                                 " pipe %" PRIu32
991                                                 " (profile %" PRId32 ") "
992                                                 "init error (% " PRId32 ")\n",
993                                                 p_tm->name, subport_id, pipe_id,
994                                                 profile_id, status);
995                         }
996                 }
997         }
998 }
999
1000 static void
1001 app_init_msgq(struct app_params *app)
1002 {
1003         uint32_t i;
1004
1005         for (i = 0; i < app->n_msgq; i++) {
1006                 struct app_msgq_params *p = &app->msgq_params[i];
1007
1008                 APP_LOG(app, HIGH, "Initializing %s ...", p->name);
1009                 app->msgq[i] = rte_ring_create(
1010                                 p->name,
1011                                 p->size,
1012                                 p->cpu_socket_id,
1013                                 RING_F_SP_ENQ | RING_F_SC_DEQ);
1014
1015                 if (app->msgq[i] == NULL)
1016                         rte_panic("%s init error\n", p->name);
1017         }
1018 }
1019
1020 static void app_pipeline_params_get(struct app_params *app,
1021         struct app_pipeline_params *p_in,
1022         struct pipeline_params *p_out)
1023 {
1024         uint32_t i;
1025
1026         strcpy(p_out->name, p_in->name);
1027
1028         p_out->socket_id = (int) p_in->socket_id;
1029
1030         p_out->log_level = app->log_level;
1031
1032         /* pktq_in */
1033         p_out->n_ports_in = p_in->n_pktq_in;
1034         for (i = 0; i < p_in->n_pktq_in; i++) {
1035                 struct app_pktq_in_params *in = &p_in->pktq_in[i];
1036                 struct pipeline_port_in_params *out = &p_out->port_in[i];
1037
1038                 switch (in->type) {
1039                 case APP_PKTQ_IN_HWQ:
1040                 {
1041                         struct app_pktq_hwq_in_params *p_hwq_in =
1042                                 &app->hwq_in_params[in->id];
1043                         struct app_link_params *p_link =
1044                                 app_get_link_for_rxq(app, p_hwq_in);
1045                         uint32_t rxq_link_id, rxq_queue_id;
1046
1047                         sscanf(p_hwq_in->name, "RXQ%" SCNu32 ".%" SCNu32,
1048                                 &rxq_link_id,
1049                                 &rxq_queue_id);
1050
1051                         out->type = PIPELINE_PORT_IN_ETHDEV_READER;
1052                         out->params.ethdev.port_id = p_link->pmd_id;
1053                         out->params.ethdev.queue_id = rxq_queue_id;
1054                         out->burst_size = p_hwq_in->burst;
1055                         break;
1056                 }
1057                 case APP_PKTQ_IN_SWQ:
1058                         out->type = PIPELINE_PORT_IN_RING_READER;
1059                         out->params.ring.ring = app->swq[in->id];
1060                         out->burst_size = app->swq_params[in->id].burst_read;
1061                         /* What about frag and ras ports? */
1062                         break;
1063                 case APP_PKTQ_IN_TM:
1064                         out->type = PIPELINE_PORT_IN_SCHED_READER;
1065                         out->params.sched.sched = app->tm[in->id];
1066                         out->burst_size = app->tm_params[in->id].burst_read;
1067                         break;
1068                 case APP_PKTQ_IN_SOURCE:
1069                         out->type = PIPELINE_PORT_IN_SOURCE;
1070                         out->params.source.mempool = app->mempool[in->id];
1071                         out->burst_size = app->source_params[in->id].burst;
1072                         break;
1073                 default:
1074                         break;
1075                 }
1076         }
1077
1078         /* pktq_out */
1079         p_out->n_ports_out = p_in->n_pktq_out;
1080         for (i = 0; i < p_in->n_pktq_out; i++) {
1081                 struct app_pktq_out_params *in = &p_in->pktq_out[i];
1082                 struct pipeline_port_out_params *out = &p_out->port_out[i];
1083
1084                 switch (in->type) {
1085                 case APP_PKTQ_OUT_HWQ:
1086                 {
1087                         struct app_pktq_hwq_out_params *p_hwq_out =
1088                                 &app->hwq_out_params[in->id];
1089                         struct app_link_params *p_link =
1090                                 app_get_link_for_txq(app, p_hwq_out);
1091                         uint32_t txq_link_id, txq_queue_id;
1092
1093                         sscanf(p_hwq_out->name,
1094                                 "TXQ%" SCNu32 ".%" SCNu32,
1095                                 &txq_link_id,
1096                                 &txq_queue_id);
1097
1098                         if (p_hwq_out->dropless == 0) {
1099                                 struct rte_port_ethdev_writer_params *params =
1100                                         &out->params.ethdev;
1101
1102                                 out->type = PIPELINE_PORT_OUT_ETHDEV_WRITER;
1103                                 params->port_id = p_link->pmd_id;
1104                                 params->queue_id = txq_queue_id;
1105                                 params->tx_burst_sz =
1106                                         app->hwq_out_params[in->id].burst;
1107                         } else {
1108                                 struct rte_port_ethdev_writer_nodrop_params
1109                                         *params = &out->params.ethdev_nodrop;
1110
1111                                 out->type =
1112                                         PIPELINE_PORT_OUT_ETHDEV_WRITER_NODROP;
1113                                 params->port_id = p_link->pmd_id;
1114                                 params->queue_id = txq_queue_id;
1115                                 params->tx_burst_sz = p_hwq_out->burst;
1116                                 params->n_retries = p_hwq_out->n_retries;
1117                         }
1118                         break;
1119                 }
1120                 case APP_PKTQ_OUT_SWQ:
1121                         if (app->swq_params[in->id].dropless == 0) {
1122                                 struct rte_port_ring_writer_params *params =
1123                                         &out->params.ring;
1124
1125                                 out->type = PIPELINE_PORT_OUT_RING_WRITER;
1126                                 params->ring = app->swq[in->id];
1127                                 params->tx_burst_sz =
1128                                         app->swq_params[in->id].burst_write;
1129                         } else {
1130                                 struct rte_port_ring_writer_nodrop_params
1131                                         *params = &out->params.ring_nodrop;
1132
1133                                 out->type =
1134                                         PIPELINE_PORT_OUT_RING_WRITER_NODROP;
1135                                 params->ring = app->swq[in->id];
1136                                 params->tx_burst_sz =
1137                                         app->swq_params[in->id].burst_write;
1138                                 params->n_retries =
1139                                         app->swq_params[in->id].n_retries;
1140                         }
1141                         /* What about frag and ras ports? */
1142                         break;
1143                 case APP_PKTQ_OUT_TM: {
1144                         struct rte_port_sched_writer_params *params =
1145                                 &out->params.sched;
1146
1147                         out->type = PIPELINE_PORT_OUT_SCHED_WRITER;
1148                         params->sched = app->tm[in->id];
1149                         params->tx_burst_sz =
1150                                 app->tm_params[in->id].burst_write;
1151                         break;
1152                 }
1153                 case APP_PKTQ_OUT_SINK:
1154                         out->type = PIPELINE_PORT_OUT_SINK;
1155                         break;
1156                 default:
1157                         break;
1158                 }
1159         }
1160
1161         /* msgq */
1162         p_out->n_msgq = p_in->n_msgq_in;
1163
1164         for (i = 0; i < p_in->n_msgq_in; i++)
1165                 p_out->msgq_in[i] = app->msgq[p_in->msgq_in[i]];
1166
1167         for (i = 0; i < p_in->n_msgq_out; i++)
1168                 p_out->msgq_out[i] = app->msgq[p_in->msgq_out[i]];
1169
1170         /* args */
1171         p_out->n_args = p_in->n_args;
1172         for (i = 0; i < p_in->n_args; i++) {
1173                 p_out->args_name[i] = p_in->args_name[i];
1174                 p_out->args_value[i] = p_in->args_value[i];
1175         }
1176 }
1177
1178 static void
1179 app_init_pipelines(struct app_params *app)
1180 {
1181         uint32_t p_id;
1182
1183         for (p_id = 0; p_id < app->n_pipelines; p_id++) {
1184                 struct app_pipeline_params *params =
1185                         &app->pipeline_params[p_id];
1186                 struct app_pipeline_data *data = &app->pipeline_data[p_id];
1187                 struct pipeline_type *ptype;
1188                 struct pipeline_params pp;
1189
1190                 APP_LOG(app, HIGH, "Initializing %s ...", params->name);
1191
1192                 ptype = app_pipeline_type_find(app, params->type);
1193                 if (ptype == NULL)
1194                         rte_panic("Init error: Unknown pipeline type \"%s\"\n",
1195                                 params->type);
1196
1197                 app_pipeline_params_get(app, params, &pp);
1198
1199                 /* Back-end */
1200                 data->be = NULL;
1201                 if (ptype->be_ops->f_init) {
1202                         data->be = ptype->be_ops->f_init(&pp, (void *) app);
1203
1204                         if (data->be == NULL)
1205                                 rte_panic("Pipeline instance \"%s\" back-end "
1206                                         "init error\n", params->name);
1207                 }
1208
1209                 /* Front-end */
1210                 data->fe = NULL;
1211                 if (ptype->fe_ops->f_init) {
1212                         data->fe = ptype->fe_ops->f_init(&pp, (void *) app);
1213
1214                         if (data->fe == NULL)
1215                                 rte_panic("Pipeline instance \"%s\" front-end "
1216                                 "init error\n", params->name);
1217                 }
1218
1219                 data->timer_period = (rte_get_tsc_hz() * params->timer_period)
1220                         / 1000;
1221         }
1222 }
1223
1224 static void
1225 app_init_threads(struct app_params *app)
1226 {
1227         uint64_t time = rte_get_tsc_cycles();
1228         uint32_t p_id;
1229
1230         for (p_id = 0; p_id < app->n_pipelines; p_id++) {
1231                 struct app_pipeline_params *params =
1232                         &app->pipeline_params[p_id];
1233                 struct app_pipeline_data *data = &app->pipeline_data[p_id];
1234                 struct pipeline_type *ptype;
1235                 struct app_thread_data *t;
1236                 struct app_thread_pipeline_data *p;
1237                 int lcore_id;
1238
1239                 lcore_id = cpu_core_map_get_lcore_id(app->core_map,
1240                         params->socket_id,
1241                         params->core_id,
1242                         params->hyper_th_id);
1243
1244                 if (lcore_id < 0)
1245                         rte_panic("Invalid core s%" PRIu32 "c%" PRIu32 "%s\n",
1246                                 params->socket_id,
1247                                 params->core_id,
1248                                 (params->hyper_th_id) ? "h" : "");
1249
1250                 t = &app->thread_data[lcore_id];
1251
1252                 ptype = app_pipeline_type_find(app, params->type);
1253                 if (ptype == NULL)
1254                         rte_panic("Init error: Unknown pipeline "
1255                                 "type \"%s\"\n", params->type);
1256
1257                 p = (ptype->be_ops->f_run == NULL) ?
1258                         &t->regular[t->n_regular] :
1259                         &t->custom[t->n_custom];
1260
1261                 p->be = data->be;
1262                 p->f_run = ptype->be_ops->f_run;
1263                 p->f_timer = ptype->be_ops->f_timer;
1264                 p->timer_period = data->timer_period;
1265                 p->deadline = time + data->timer_period;
1266
1267                 if (ptype->be_ops->f_run == NULL)
1268                         t->n_regular++;
1269                 else
1270                         t->n_custom++;
1271         }
1272 }
1273
1274 int app_init(struct app_params *app)
1275 {
1276         app_init_core_map(app);
1277         app_init_core_mask(app);
1278
1279         app_init_eal(app);
1280         app_init_mempool(app);
1281         app_init_link(app);
1282         app_init_swq(app);
1283         app_init_tm(app);
1284         app_init_msgq(app);
1285
1286         app_pipeline_common_cmd_push(app);
1287         app_pipeline_type_register(app, &pipeline_master);
1288
1289         app_init_pipelines(app);
1290         app_init_threads(app);
1291
1292         return 0;
1293 }
1294
1295 static int
1296 app_pipeline_type_cmd_push(struct app_params *app,
1297         struct pipeline_type *ptype)
1298 {
1299         cmdline_parse_ctx_t *cmds;
1300         uint32_t n_cmds, i;
1301
1302         /* Check input arguments */
1303         if ((app == NULL) ||
1304                 (ptype == NULL))
1305                 return -EINVAL;
1306
1307         n_cmds = pipeline_type_cmds_count(ptype);
1308         if (n_cmds == 0)
1309                 return 0;
1310
1311         cmds = ptype->fe_ops->cmds;
1312
1313         /* Check for available slots in the application commands array */
1314         if (n_cmds > APP_MAX_CMDS - app->n_cmds)
1315                 return -ENOMEM;
1316
1317         /* Push pipeline commands into the application */
1318         memcpy(&app->cmds[app->n_cmds],
1319                 cmds,
1320                 n_cmds * sizeof(cmdline_parse_ctx_t *));
1321
1322         for (i = 0; i < n_cmds; i++)
1323                 app->cmds[app->n_cmds + i]->data = app;
1324
1325         app->n_cmds += n_cmds;
1326         app->cmds[app->n_cmds] = NULL;
1327
1328         return 0;
1329 }
1330
1331 int
1332 app_pipeline_type_register(struct app_params *app, struct pipeline_type *ptype)
1333 {
1334         uint32_t n_cmds, i;
1335
1336         /* Check input arguments */
1337         if ((app == NULL) ||
1338                 (ptype == NULL) ||
1339                 (ptype->name == NULL) ||
1340                 (strlen(ptype->name) == 0) ||
1341                 (ptype->be_ops->f_init == NULL) ||
1342                 (ptype->be_ops->f_timer == NULL))
1343                 return -EINVAL;
1344
1345         /* Check for duplicate entry */
1346         for (i = 0; i < app->n_pipeline_types; i++)
1347                 if (strcmp(app->pipeline_type[i].name, ptype->name) == 0)
1348                         return -EEXIST;
1349
1350         /* Check for resource availability */
1351         n_cmds = pipeline_type_cmds_count(ptype);
1352         if ((app->n_pipeline_types == APP_MAX_PIPELINE_TYPES) ||
1353                 (n_cmds > APP_MAX_CMDS - app->n_cmds))
1354                 return -ENOMEM;
1355
1356         /* Copy pipeline type */
1357         memcpy(&app->pipeline_type[app->n_pipeline_types++],
1358                 ptype,
1359                 sizeof(struct pipeline_type));
1360
1361         /* Copy CLI commands */
1362         if (n_cmds)
1363                 app_pipeline_type_cmd_push(app, ptype);
1364
1365         return 0;
1366 }
1367
1368 struct
1369 pipeline_type *app_pipeline_type_find(struct app_params *app, char *name)
1370 {
1371         uint32_t i;
1372
1373         for (i = 0; i < app->n_pipeline_types; i++)
1374                 if (strcmp(app->pipeline_type[i].name, name) == 0)
1375                         return &app->pipeline_type[i];
1376
1377         return NULL;
1378 }