examples/ip_pipeline: track rules on delete default
[dpdk.git] / examples / ip_pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38         struct rte_ring *msgq_req;
39         struct rte_ring *msgq_rsp;
40
41         uint32_t enabled;
42 };
43
44 static struct thread thread[RTE_MAX_LCORE];
45
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50         struct rte_table_action *a;
51 };
52
53 struct pipeline_data {
54         struct rte_pipeline *p;
55         struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56         uint32_t n_tables;
57
58         struct rte_ring *msgq_req;
59         struct rte_ring *msgq_rsp;
60         uint64_t timer_period; /* Measured in CPU cycles. */
61         uint64_t time_next;
62
63         uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65
66 struct thread_data {
67         struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68         uint32_t n_pipelines;
69
70         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71         struct rte_ring *msgq_req;
72         struct rte_ring *msgq_rsp;
73         uint64_t timer_period; /* Measured in CPU cycles. */
74         uint64_t time_next;
75         uint64_t time_next_min;
76 } __rte_cache_aligned;
77
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86         uint32_t i;
87
88         for (i = 0; i < RTE_MAX_LCORE; i++) {
89                 struct thread *t = &thread[i];
90
91                 if (!rte_lcore_is_enabled(i))
92                         continue;
93
94                 /* MSGQs */
95                 if (t->msgq_req)
96                         rte_ring_free(t->msgq_req);
97
98                 if (t->msgq_rsp)
99                         rte_ring_free(t->msgq_rsp);
100         }
101 }
102
103 int
104 thread_init(void)
105 {
106         uint32_t i;
107
108         RTE_LCORE_FOREACH_SLAVE(i) {
109                 char name[NAME_MAX];
110                 struct rte_ring *msgq_req, *msgq_rsp;
111                 struct thread *t = &thread[i];
112                 struct thread_data *t_data = &thread_data[i];
113                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
114
115                 /* MSGQs */
116                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117
118                 msgq_req = rte_ring_create(name,
119                         THREAD_MSGQ_SIZE,
120                         cpu_id,
121                         RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123                 if (msgq_req == NULL) {
124                         thread_free();
125                         return -1;
126                 }
127
128                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129
130                 msgq_rsp = rte_ring_create(name,
131                         THREAD_MSGQ_SIZE,
132                         cpu_id,
133                         RING_F_SP_ENQ | RING_F_SC_DEQ);
134
135                 if (msgq_rsp == NULL) {
136                         thread_free();
137                         return -1;
138                 }
139
140                 /* Master thread records */
141                 t->msgq_req = msgq_req;
142                 t->msgq_rsp = msgq_rsp;
143                 t->enabled = 1;
144
145                 /* Data plane thread records */
146                 t_data->n_pipelines = 0;
147                 t_data->msgq_req = msgq_req;
148                 t_data->msgq_rsp = msgq_rsp;
149                 t_data->timer_period =
150                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152                 t_data->time_next_min = t_data->time_next;
153         }
154
155         return 0;
156 }
157
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161         enum rte_lcore_state_t thread_state;
162
163         thread_state = rte_eal_get_lcore_state(thread_id);
164         return (thread_state == RUNNING) ? 1 : 0;
165 }
166
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175         if (p->enabled == 0)
176                 return 0;
177
178         return thread_is_running(p->thread_id);
179 }
180
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185         THREAD_REQ_PIPELINE_ENABLE = 0,
186         THREAD_REQ_PIPELINE_DISABLE,
187         THREAD_REQ_MAX
188 };
189
190 struct thread_msg_req {
191         enum thread_req_type type;
192
193         union {
194                 struct {
195                         struct rte_pipeline *p;
196                         struct {
197                                 struct rte_table_action *a;
198                         } table[RTE_PIPELINE_TABLE_MAX];
199                         struct rte_ring *msgq_req;
200                         struct rte_ring *msgq_rsp;
201                         uint32_t timer_period_ms;
202                         uint32_t n_tables;
203                 } pipeline_enable;
204
205                 struct {
206                         struct rte_pipeline *p;
207                 } pipeline_disable;
208         };
209 };
210
211 struct thread_msg_rsp {
212         int status;
213 };
214
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222                 sizeof(struct thread_msg_rsp));
223
224         return calloc(1, size);
225 }
226
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230         free(rsp);
231 }
232
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235         struct thread_msg_req *req)
236 {
237         struct thread *t = &thread[thread_id];
238         struct rte_ring *msgq_req = t->msgq_req;
239         struct rte_ring *msgq_rsp = t->msgq_rsp;
240         struct thread_msg_rsp *rsp;
241         int status;
242
243         /* send */
244         do {
245                 status = rte_ring_sp_enqueue(msgq_req, req);
246         } while (status == -ENOBUFS);
247
248         /* recv */
249         do {
250                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251         } while (status != 0);
252
253         return rsp;
254 }
255
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258         const char *pipeline_name)
259 {
260         struct pipeline *p = pipeline_find(pipeline_name);
261         struct thread *t;
262         struct thread_msg_req *req;
263         struct thread_msg_rsp *rsp;
264         uint32_t i;
265         int status;
266
267         /* Check input params */
268         if ((thread_id >= RTE_MAX_LCORE) ||
269                 (p == NULL) ||
270                 (p->n_ports_in == 0) ||
271                 (p->n_ports_out == 0) ||
272                 (p->n_tables == 0))
273                 return -1;
274
275         t = &thread[thread_id];
276         if ((t->enabled == 0) ||
277                 p->enabled)
278                 return -1;
279
280         if (!thread_is_running(thread_id)) {
281                 struct thread_data *td = &thread_data[thread_id];
282                 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283
284                 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285                         return -1;
286
287                 /* Data plane thread */
288                 td->p[td->n_pipelines] = p->p;
289
290                 tdp->p = p->p;
291                 for (i = 0; i < p->n_tables; i++)
292                         tdp->table_data[i].a = p->table[i].a;
293
294                 tdp->n_tables = p->n_tables;
295
296                 tdp->msgq_req = p->msgq_req;
297                 tdp->msgq_rsp = p->msgq_rsp;
298                 tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299                 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300
301                 td->n_pipelines++;
302
303                 /* Pipeline */
304                 p->thread_id = thread_id;
305                 p->enabled = 1;
306
307                 return 0;
308         }
309
310         /* Allocate request */
311         req = thread_msg_alloc();
312         if (req == NULL)
313                 return -1;
314
315         /* Write request */
316         req->type = THREAD_REQ_PIPELINE_ENABLE;
317         req->pipeline_enable.p = p->p;
318         for (i = 0; i < p->n_tables; i++)
319                 req->pipeline_enable.table[i].a =
320                         p->table[i].a;
321         req->pipeline_enable.msgq_req = p->msgq_req;
322         req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324         req->pipeline_enable.n_tables = p->n_tables;
325
326         /* Send request and wait for response */
327         rsp = thread_msg_send_recv(thread_id, req);
328         if (rsp == NULL)
329                 return -1;
330
331         /* Read response */
332         status = rsp->status;
333
334         /* Free response */
335         thread_msg_free(rsp);
336
337         /* Request completion */
338         if (status)
339                 return status;
340
341         p->thread_id = thread_id;
342         p->enabled = 1;
343
344         return 0;
345 }
346
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349         const char *pipeline_name)
350 {
351         struct pipeline *p = pipeline_find(pipeline_name);
352         struct thread *t;
353         struct thread_msg_req *req;
354         struct thread_msg_rsp *rsp;
355         int status;
356
357         /* Check input params */
358         if ((thread_id >= RTE_MAX_LCORE) ||
359                 (p == NULL))
360                 return -1;
361
362         t = &thread[thread_id];
363         if (t->enabled == 0)
364                 return -1;
365
366         if (p->enabled == 0)
367                 return 0;
368
369         if (p->thread_id != thread_id)
370                 return -1;
371
372         if (!thread_is_running(thread_id)) {
373                 struct thread_data *td = &thread_data[thread_id];
374                 uint32_t i;
375
376                 for (i = 0; i < td->n_pipelines; i++) {
377                         struct pipeline_data *tdp = &td->pipeline_data[i];
378
379                         if (tdp->p != p->p)
380                                 continue;
381
382                         /* Data plane thread */
383                         if (i < td->n_pipelines - 1) {
384                                 struct rte_pipeline *pipeline_last =
385                                         td->p[td->n_pipelines - 1];
386                                 struct pipeline_data *tdp_last =
387                                         &td->pipeline_data[td->n_pipelines - 1];
388
389                                 td->p[i] = pipeline_last;
390                                 memcpy(tdp, tdp_last, sizeof(*tdp));
391                         }
392
393                         td->n_pipelines--;
394
395                         /* Pipeline */
396                         p->enabled = 0;
397
398                         break;
399                 }
400
401                 return 0;
402         }
403
404         /* Allocate request */
405         req = thread_msg_alloc();
406         if (req == NULL)
407                 return -1;
408
409         /* Write request */
410         req->type = THREAD_REQ_PIPELINE_DISABLE;
411         req->pipeline_disable.p = p->p;
412
413         /* Send request and wait for response */
414         rsp = thread_msg_send_recv(thread_id, req);
415         if (rsp == NULL)
416                 return -1;
417
418         /* Read response */
419         status = rsp->status;
420
421         /* Free response */
422         thread_msg_free(rsp);
423
424         /* Request completion */
425         if (status)
426                 return status;
427
428         p->enabled = 0;
429
430         return 0;
431 }
432
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439         struct thread_msg_req *req;
440
441         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442
443         if (status != 0)
444                 return NULL;
445
446         return req;
447 }
448
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451         struct thread_msg_rsp *rsp)
452 {
453         int status;
454
455         do {
456                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457         } while (status == -ENOBUFS);
458 }
459
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462         struct thread_msg_req *req)
463 {
464         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466         uint32_t i;
467
468         /* Request */
469         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470                 rsp->status = -1;
471                 return rsp;
472         }
473
474         t->p[t->n_pipelines] = req->pipeline_enable.p;
475
476         p->p = req->pipeline_enable.p;
477         for (i = 0; i < req->pipeline_enable.n_tables; i++)
478                 p->table_data[i].a =
479                         req->pipeline_enable.table[i].a;
480
481         p->n_tables = req->pipeline_enable.n_tables;
482
483         p->msgq_req = req->pipeline_enable.msgq_req;
484         p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485         p->timer_period =
486                 (rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487         p->time_next = rte_get_tsc_cycles() + p->timer_period;
488
489         t->n_pipelines++;
490
491         /* Response */
492         rsp->status = 0;
493         return rsp;
494 }
495
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498         struct thread_msg_req *req)
499 {
500         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501         uint32_t n_pipelines = t->n_pipelines;
502         struct rte_pipeline *pipeline = req->pipeline_disable.p;
503         uint32_t i;
504
505         /* find pipeline */
506         for (i = 0; i < n_pipelines; i++) {
507                 struct pipeline_data *p = &t->pipeline_data[i];
508
509                 if (p->p != pipeline)
510                         continue;
511
512                 if (i < n_pipelines - 1) {
513                         struct rte_pipeline *pipeline_last =
514                                 t->p[n_pipelines - 1];
515                         struct pipeline_data *p_last =
516                                 &t->pipeline_data[n_pipelines - 1];
517
518                         t->p[i] = pipeline_last;
519                         memcpy(p, p_last, sizeof(*p));
520                 }
521
522                 t->n_pipelines--;
523
524                 rsp->status = 0;
525                 return rsp;
526         }
527
528         /* should not get here */
529         rsp->status = 0;
530         return rsp;
531 }
532
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536         for ( ; ; ) {
537                 struct thread_msg_req *req;
538                 struct thread_msg_rsp *rsp;
539
540                 req = thread_msg_recv(t->msgq_req);
541                 if (req == NULL)
542                         break;
543
544                 switch (req->type) {
545                 case THREAD_REQ_PIPELINE_ENABLE:
546                         rsp = thread_msg_handle_pipeline_enable(t, req);
547                         break;
548
549                 case THREAD_REQ_PIPELINE_DISABLE:
550                         rsp = thread_msg_handle_pipeline_disable(t, req);
551                         break;
552
553                 default:
554                         rsp = (struct thread_msg_rsp *) req;
555                         rsp->status = -1;
556                 }
557
558                 thread_msg_send(t->msgq_rsp, rsp);
559         }
560 }
561
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566         /* Port IN */
567         PIPELINE_REQ_PORT_IN_STATS_READ,
568         PIPELINE_REQ_PORT_IN_ENABLE,
569         PIPELINE_REQ_PORT_IN_DISABLE,
570
571         /* Port OUT */
572         PIPELINE_REQ_PORT_OUT_STATS_READ,
573
574         /* Table */
575         PIPELINE_REQ_TABLE_STATS_READ,
576         PIPELINE_REQ_TABLE_RULE_ADD,
577         PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578         PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579         PIPELINE_REQ_TABLE_RULE_DELETE,
580         PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581         PIPELINE_REQ_TABLE_RULE_STATS_READ,
582         PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583         PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584         PIPELINE_REQ_TABLE_RULE_MTR_READ,
585         PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586         PIPELINE_REQ_TABLE_RULE_TTL_READ,
587         PIPELINE_REQ_MAX
588 };
589
590 struct pipeline_msg_req_port_in_stats_read {
591         int clear;
592 };
593
594 struct pipeline_msg_req_port_out_stats_read {
595         int clear;
596 };
597
598 struct pipeline_msg_req_table_stats_read {
599         int clear;
600 };
601
602 struct pipeline_msg_req_table_rule_add {
603         struct table_rule_match match;
604         struct table_rule_action action;
605 };
606
607 struct pipeline_msg_req_table_rule_add_default {
608         struct table_rule_action action;
609 };
610
611 struct pipeline_msg_req_table_rule_add_bulk {
612         struct table_rule_list *list;
613         int bulk;
614 };
615
616 struct pipeline_msg_req_table_rule_delete {
617         struct table_rule_match match;
618 };
619
620 struct pipeline_msg_req_table_rule_stats_read {
621         void *data;
622         int clear;
623 };
624
625 struct pipeline_msg_req_table_mtr_profile_add {
626         uint32_t meter_profile_id;
627         struct rte_table_action_meter_profile profile;
628 };
629
630 struct pipeline_msg_req_table_mtr_profile_delete {
631         uint32_t meter_profile_id;
632 };
633
634 struct pipeline_msg_req_table_rule_mtr_read {
635         void *data;
636         uint32_t tc_mask;
637         int clear;
638 };
639
640 struct pipeline_msg_req_table_dscp_table_update {
641         uint64_t dscp_mask;
642         struct rte_table_action_dscp_table dscp_table;
643 };
644
645 struct pipeline_msg_req_table_rule_ttl_read {
646         void *data;
647         int clear;
648 };
649
650 struct pipeline_msg_req {
651         enum pipeline_req_type type;
652         uint32_t id; /* Port IN, port OUT or table ID */
653
654         RTE_STD_C11
655         union {
656                 struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
657                 struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
658                 struct pipeline_msg_req_table_stats_read table_stats_read;
659                 struct pipeline_msg_req_table_rule_add table_rule_add;
660                 struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
661                 struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
662                 struct pipeline_msg_req_table_rule_delete table_rule_delete;
663                 struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
664                 struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
665                 struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
666                 struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
667                 struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
668                 struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
669         };
670 };
671
672 struct pipeline_msg_rsp_port_in_stats_read {
673         struct rte_pipeline_port_in_stats stats;
674 };
675
676 struct pipeline_msg_rsp_port_out_stats_read {
677         struct rte_pipeline_port_out_stats stats;
678 };
679
680 struct pipeline_msg_rsp_table_stats_read {
681         struct rte_pipeline_table_stats stats;
682 };
683
684 struct pipeline_msg_rsp_table_rule_add {
685         void *data;
686 };
687
688 struct pipeline_msg_rsp_table_rule_add_default {
689         void *data;
690 };
691
692 struct pipeline_msg_rsp_table_rule_add_bulk {
693         uint32_t n_rules;
694 };
695
696 struct pipeline_msg_rsp_table_rule_stats_read {
697         struct rte_table_action_stats_counters stats;
698 };
699
700 struct pipeline_msg_rsp_table_rule_mtr_read {
701         struct rte_table_action_mtr_counters stats;
702 };
703
704 struct pipeline_msg_rsp_table_rule_ttl_read {
705         struct rte_table_action_ttl_counters stats;
706 };
707
708 struct pipeline_msg_rsp {
709         int status;
710
711         RTE_STD_C11
712         union {
713                 struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
714                 struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
715                 struct pipeline_msg_rsp_table_stats_read table_stats_read;
716                 struct pipeline_msg_rsp_table_rule_add table_rule_add;
717                 struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
718                 struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
719                 struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
720                 struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
721                 struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
722         };
723 };
724
725 /**
726  * Master thread
727  */
728 static struct pipeline_msg_req *
729 pipeline_msg_alloc(void)
730 {
731         size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
732                 sizeof(struct pipeline_msg_rsp));
733
734         return calloc(1, size);
735 }
736
737 static void
738 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
739 {
740         free(rsp);
741 }
742
743 static struct pipeline_msg_rsp *
744 pipeline_msg_send_recv(struct pipeline *p,
745         struct pipeline_msg_req *req)
746 {
747         struct rte_ring *msgq_req = p->msgq_req;
748         struct rte_ring *msgq_rsp = p->msgq_rsp;
749         struct pipeline_msg_rsp *rsp;
750         int status;
751
752         /* send */
753         do {
754                 status = rte_ring_sp_enqueue(msgq_req, req);
755         } while (status == -ENOBUFS);
756
757         /* recv */
758         do {
759                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
760         } while (status != 0);
761
762         return rsp;
763 }
764
765 int
766 pipeline_port_in_stats_read(const char *pipeline_name,
767         uint32_t port_id,
768         struct rte_pipeline_port_in_stats *stats,
769         int clear)
770 {
771         struct pipeline *p;
772         struct pipeline_msg_req *req;
773         struct pipeline_msg_rsp *rsp;
774         int status;
775
776         /* Check input params */
777         if ((pipeline_name == NULL) ||
778                 (stats == NULL))
779                 return -1;
780
781         p = pipeline_find(pipeline_name);
782         if ((p == NULL) ||
783                 (port_id >= p->n_ports_in))
784                 return -1;
785
786         if (!pipeline_is_running(p)) {
787                 status = rte_pipeline_port_in_stats_read(p->p,
788                         port_id,
789                         stats,
790                         clear);
791
792                 return status;
793         }
794
795         /* Allocate request */
796         req = pipeline_msg_alloc();
797         if (req == NULL)
798                 return -1;
799
800         /* Write request */
801         req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
802         req->id = port_id;
803         req->port_in_stats_read.clear = clear;
804
805         /* Send request and wait for response */
806         rsp = pipeline_msg_send_recv(p, req);
807         if (rsp == NULL)
808                 return -1;
809
810         /* Read response */
811         status = rsp->status;
812         if (status)
813                 memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
814
815         /* Free response */
816         pipeline_msg_free(rsp);
817
818         return status;
819 }
820
821 int
822 pipeline_port_in_enable(const char *pipeline_name,
823         uint32_t port_id)
824 {
825         struct pipeline *p;
826         struct pipeline_msg_req *req;
827         struct pipeline_msg_rsp *rsp;
828         int status;
829
830         /* Check input params */
831         if (pipeline_name == NULL)
832                 return -1;
833
834         p = pipeline_find(pipeline_name);
835         if ((p == NULL) ||
836                 (port_id >= p->n_ports_in))
837                 return -1;
838
839         if (!pipeline_is_running(p)) {
840                 status = rte_pipeline_port_in_enable(p->p, port_id);
841                 return status;
842         }
843
844         /* Allocate request */
845         req = pipeline_msg_alloc();
846         if (req == NULL)
847                 return -1;
848
849         /* Write request */
850         req->type = PIPELINE_REQ_PORT_IN_ENABLE;
851         req->id = port_id;
852
853         /* Send request and wait for response */
854         rsp = pipeline_msg_send_recv(p, req);
855         if (rsp == NULL)
856                 return -1;
857
858         /* Read response */
859         status = rsp->status;
860
861         /* Free response */
862         pipeline_msg_free(rsp);
863
864         return status;
865 }
866
867 int
868 pipeline_port_in_disable(const char *pipeline_name,
869         uint32_t port_id)
870 {
871         struct pipeline *p;
872         struct pipeline_msg_req *req;
873         struct pipeline_msg_rsp *rsp;
874         int status;
875
876         /* Check input params */
877         if (pipeline_name == NULL)
878                 return -1;
879
880         p = pipeline_find(pipeline_name);
881         if ((p == NULL) ||
882                 (port_id >= p->n_ports_in))
883                 return -1;
884
885         if (!pipeline_is_running(p)) {
886                 status = rte_pipeline_port_in_disable(p->p, port_id);
887                 return status;
888         }
889
890         /* Allocate request */
891         req = pipeline_msg_alloc();
892         if (req == NULL)
893                 return -1;
894
895         /* Write request */
896         req->type = PIPELINE_REQ_PORT_IN_DISABLE;
897         req->id = port_id;
898
899         /* Send request and wait for response */
900         rsp = pipeline_msg_send_recv(p, req);
901         if (rsp == NULL)
902                 return -1;
903
904         /* Read response */
905         status = rsp->status;
906
907         /* Free response */
908         pipeline_msg_free(rsp);
909
910         return status;
911 }
912
913 int
914 pipeline_port_out_stats_read(const char *pipeline_name,
915         uint32_t port_id,
916         struct rte_pipeline_port_out_stats *stats,
917         int clear)
918 {
919         struct pipeline *p;
920         struct pipeline_msg_req *req;
921         struct pipeline_msg_rsp *rsp;
922         int status;
923
924         /* Check input params */
925         if ((pipeline_name == NULL) ||
926                 (stats == NULL))
927                 return -1;
928
929         p = pipeline_find(pipeline_name);
930         if ((p == NULL) ||
931                 (port_id >= p->n_ports_out))
932                 return -1;
933
934         if (!pipeline_is_running(p)) {
935                 status = rte_pipeline_port_out_stats_read(p->p,
936                         port_id,
937                         stats,
938                         clear);
939
940                 return status;
941         }
942
943         /* Allocate request */
944         req = pipeline_msg_alloc();
945         if (req == NULL)
946                 return -1;
947
948         /* Write request */
949         req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
950         req->id = port_id;
951         req->port_out_stats_read.clear = clear;
952
953         /* Send request and wait for response */
954         rsp = pipeline_msg_send_recv(p, req);
955         if (rsp == NULL)
956                 return -1;
957
958         /* Read response */
959         status = rsp->status;
960         if (status)
961                 memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
962
963         /* Free response */
964         pipeline_msg_free(rsp);
965
966         return status;
967 }
968
969 int
970 pipeline_table_stats_read(const char *pipeline_name,
971         uint32_t table_id,
972         struct rte_pipeline_table_stats *stats,
973         int clear)
974 {
975         struct pipeline *p;
976         struct pipeline_msg_req *req;
977         struct pipeline_msg_rsp *rsp;
978         int status;
979
980         /* Check input params */
981         if ((pipeline_name == NULL) ||
982                 (stats == NULL))
983                 return -1;
984
985         p = pipeline_find(pipeline_name);
986         if ((p == NULL) ||
987                 (table_id >= p->n_tables))
988                 return -1;
989
990         if (!pipeline_is_running(p)) {
991                 status = rte_pipeline_table_stats_read(p->p,
992                         table_id,
993                         stats,
994                         clear);
995
996                 return status;
997         }
998
999         /* Allocate request */
1000         req = pipeline_msg_alloc();
1001         if (req == NULL)
1002                 return -1;
1003
1004         /* Write request */
1005         req->type = PIPELINE_REQ_TABLE_STATS_READ;
1006         req->id = table_id;
1007         req->table_stats_read.clear = clear;
1008
1009         /* Send request and wait for response */
1010         rsp = pipeline_msg_send_recv(p, req);
1011         if (rsp == NULL)
1012                 return -1;
1013
1014         /* Read response */
1015         status = rsp->status;
1016         if (status)
1017                 memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1018
1019         /* Free response */
1020         pipeline_msg_free(rsp);
1021
1022         return status;
1023 }
1024
1025 static int
1026 match_check(struct table_rule_match *match,
1027         struct pipeline *p,
1028         uint32_t table_id)
1029 {
1030         struct table *table;
1031
1032         if ((match == NULL) ||
1033                 (p == NULL) ||
1034                 (table_id >= p->n_tables))
1035                 return -1;
1036
1037         table = &p->table[table_id];
1038         if (match->match_type != table->params.match_type)
1039                 return -1;
1040
1041         switch (match->match_type) {
1042         case TABLE_ACL:
1043         {
1044                 struct table_acl_params *t = &table->params.match.acl;
1045                 struct table_rule_match_acl *r = &match->match.acl;
1046
1047                 if ((r->ip_version && (t->ip_version == 0)) ||
1048                         ((r->ip_version == 0) && t->ip_version))
1049                         return -1;
1050
1051                 if (r->ip_version) {
1052                         if ((r->sa_depth > 32) ||
1053                                 (r->da_depth > 32))
1054                                 return -1;
1055                 } else {
1056                         if ((r->sa_depth > 128) ||
1057                                 (r->da_depth > 128))
1058                                 return -1;
1059                 }
1060                 return 0;
1061         }
1062
1063         case TABLE_ARRAY:
1064                 return 0;
1065
1066         case TABLE_HASH:
1067                 return 0;
1068
1069         case TABLE_LPM:
1070         {
1071                 struct table_lpm_params *t = &table->params.match.lpm;
1072                 struct table_rule_match_lpm *r = &match->match.lpm;
1073
1074                 if ((r->ip_version && (t->key_size != 4)) ||
1075                         ((r->ip_version == 0) && (t->key_size != 16)))
1076                         return -1;
1077
1078                 if (r->ip_version) {
1079                         if (r->depth > 32)
1080                                 return -1;
1081                 } else {
1082                         if (r->depth > 128)
1083                                 return -1;
1084                 }
1085                 return 0;
1086         }
1087
1088         case TABLE_STUB:
1089                 return -1;
1090
1091         default:
1092                 return -1;
1093         }
1094 }
1095
1096 static int
1097 action_check(struct table_rule_action *action,
1098         struct pipeline *p,
1099         uint32_t table_id)
1100 {
1101         struct table_action_profile *ap;
1102
1103         if ((action == NULL) ||
1104                 (p == NULL) ||
1105                 (table_id >= p->n_tables))
1106                 return -1;
1107
1108         ap = p->table[table_id].ap;
1109         if (action->action_mask != ap->params.action_mask)
1110                 return -1;
1111
1112         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1113                 if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1114                         (action->fwd.id >= p->n_ports_out))
1115                         return -1;
1116
1117                 if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1118                         (action->fwd.id >= p->n_tables))
1119                         return -1;
1120         }
1121
1122         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1123                 uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1124                 uint32_t tc_mask1 = action->mtr.tc_mask;
1125
1126                 if (tc_mask1 != tc_mask0)
1127                         return -1;
1128         }
1129
1130         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1131                 uint32_t n_subports_per_port =
1132                         ap->params.tm.n_subports_per_port;
1133                 uint32_t n_pipes_per_subport =
1134                         ap->params.tm.n_pipes_per_subport;
1135                 uint32_t subport_id = action->tm.subport_id;
1136                 uint32_t pipe_id = action->tm.pipe_id;
1137
1138                 if ((subport_id >= n_subports_per_port) ||
1139                         (pipe_id >= n_pipes_per_subport))
1140                         return -1;
1141         }
1142
1143         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1144                 uint64_t encap_mask = ap->params.encap.encap_mask;
1145                 enum rte_table_action_encap_type type = action->encap.type;
1146
1147                 if ((encap_mask & (1LLU << type)) == 0)
1148                         return -1;
1149         }
1150
1151         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1152                 int ip_version0 = ap->params.common.ip_version;
1153                 int ip_version1 = action->nat.ip_version;
1154
1155                 if ((ip_version1 && (ip_version0 == 0)) ||
1156                         ((ip_version1 == 0) && ip_version0))
1157                         return -1;
1158         }
1159
1160         return 0;
1161 }
1162
1163 static int
1164 action_default_check(struct table_rule_action *action,
1165         struct pipeline *p,
1166         uint32_t table_id)
1167 {
1168         if ((action == NULL) ||
1169                 (action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1170                 (p == NULL) ||
1171                 (table_id >= p->n_tables))
1172                 return -1;
1173
1174         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1175                 if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1176                         (action->fwd.id >= p->n_ports_out))
1177                         return -1;
1178
1179                 if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1180                         (action->fwd.id >= p->n_tables))
1181                         return -1;
1182         }
1183
1184         return 0;
1185 }
1186
1187 union table_rule_match_low_level {
1188         struct rte_table_acl_rule_add_params acl_add;
1189         struct rte_table_acl_rule_delete_params acl_delete;
1190         struct rte_table_array_key array;
1191         uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1192         struct rte_table_lpm_key lpm_ipv4;
1193         struct rte_table_lpm_ipv6_key lpm_ipv6;
1194 };
1195
1196 static int
1197 match_convert(struct table_rule_match *mh,
1198         union table_rule_match_low_level *ml,
1199         int add);
1200
1201 static int
1202 action_convert(struct rte_table_action *a,
1203         struct table_rule_action *action,
1204         struct rte_pipeline_table_entry *data);
1205
1206 struct table_ll {
1207         struct rte_pipeline *p;
1208         int table_id;
1209         struct rte_table_action *a;
1210         int bulk_supported;
1211 };
1212
1213 static int
1214 table_rule_add_bulk_ll(struct table_ll *table,
1215         struct table_rule_list *list,
1216         uint32_t *n_rules)
1217 {
1218         union table_rule_match_low_level *match_ll = NULL;
1219         uint8_t *action_ll = NULL;
1220         void **match_ll_ptr = NULL;
1221         struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1222         struct rte_pipeline_table_entry **entries_ptr = NULL;
1223         int *found = NULL;
1224         struct table_rule *rule;
1225         uint32_t n, i;
1226         int status = 0;
1227
1228         n = 0;
1229         TAILQ_FOREACH(rule, list, node)
1230                 n++;
1231
1232         /* Memory allocation */
1233         match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1234         action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1235
1236         match_ll_ptr = calloc(n, sizeof(void *));
1237         action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1238
1239         entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1240         found = calloc(n, sizeof(int));
1241
1242         if (match_ll == NULL ||
1243                 action_ll == NULL ||
1244                 match_ll_ptr == NULL ||
1245                 action_ll_ptr == NULL ||
1246                 entries_ptr == NULL ||
1247                 found == NULL) {
1248                         status = -ENOMEM;
1249                         goto table_rule_add_bulk_ll_free;
1250         }
1251
1252         /* Init */
1253         for (i = 0; i < n; i++) {
1254                 match_ll_ptr[i] = (void *)&match_ll[i];
1255                 action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1256                         &action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1257         }
1258
1259         /* Rule (match, action) conversion */
1260         i = 0;
1261         TAILQ_FOREACH(rule, list, node) {
1262                 status = match_convert(&rule->match, match_ll_ptr[i], 1);
1263                 if (status)
1264                         goto table_rule_add_bulk_ll_free;
1265
1266                 status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1267                 if (status)
1268                         goto table_rule_add_bulk_ll_free;
1269
1270                 i++;
1271         }
1272
1273         /* Add rule (match, action) to table */
1274         if (table->bulk_supported) {
1275                 status = rte_pipeline_table_entry_add_bulk(table->p,
1276                         table->table_id,
1277                         match_ll_ptr,
1278                         action_ll_ptr,
1279                         n,
1280                         found,
1281                         entries_ptr);
1282                 if (status)
1283                         goto table_rule_add_bulk_ll_free;
1284         } else
1285                 for (i = 0; i < n; i++) {
1286                         status = rte_pipeline_table_entry_add(table->p,
1287                                 table->table_id,
1288                                 match_ll_ptr[i],
1289                                 action_ll_ptr[i],
1290                                 &found[i],
1291                                 &entries_ptr[i]);
1292                         if (status) {
1293                                 if (i == 0)
1294                                         goto table_rule_add_bulk_ll_free;
1295
1296                                 /* No roll-back. */
1297                                 status = 0;
1298                                 n = i;
1299                                 break;
1300                         }
1301                 }
1302
1303         /* Write back to the rule list. */
1304         i = 0;
1305         TAILQ_FOREACH(rule, list, node) {
1306                 if (i >= n)
1307                         break;
1308
1309                 rule->data = entries_ptr[i];
1310
1311                 i++;
1312         }
1313
1314         *n_rules = n;
1315
1316         /* Free */
1317 table_rule_add_bulk_ll_free:
1318         free(found);
1319         free(entries_ptr);
1320         free(action_ll_ptr);
1321         free(match_ll_ptr);
1322         free(action_ll);
1323         free(match_ll);
1324
1325         return status;
1326 }
1327
1328 int
1329 pipeline_table_rule_add(const char *pipeline_name,
1330         uint32_t table_id,
1331         struct table_rule_match *match,
1332         struct table_rule_action *action)
1333 {
1334         struct pipeline *p;
1335         struct table *table;
1336         struct pipeline_msg_req *req;
1337         struct pipeline_msg_rsp *rsp;
1338         struct table_rule *rule;
1339         int status;
1340
1341         /* Check input params */
1342         if ((pipeline_name == NULL) ||
1343                 (match == NULL) ||
1344                 (action == NULL))
1345                 return -1;
1346
1347         p = pipeline_find(pipeline_name);
1348         if ((p == NULL) ||
1349                 (table_id >= p->n_tables) ||
1350                 match_check(match, p, table_id) ||
1351                 action_check(action, p, table_id))
1352                 return -1;
1353
1354         table = &p->table[table_id];
1355
1356         rule = calloc(1, sizeof(struct table_rule));
1357         if (rule == NULL)
1358                 return -1;
1359
1360         memcpy(&rule->match, match, sizeof(*match));
1361         memcpy(&rule->action, action, sizeof(*action));
1362
1363         if (!pipeline_is_running(p)) {
1364                 union table_rule_match_low_level match_ll;
1365                 struct rte_pipeline_table_entry *data_in, *data_out;
1366                 int key_found;
1367                 uint8_t *buffer;
1368
1369                 buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1370                 if (buffer == NULL) {
1371                         free(rule);
1372                         return -1;
1373                 }
1374
1375                 /* Table match-action rule conversion */
1376                 data_in = (struct rte_pipeline_table_entry *)buffer;
1377
1378                 status = match_convert(match, &match_ll, 1);
1379                 if (status) {
1380                         free(buffer);
1381                         free(rule);
1382                         return -1;
1383                 }
1384
1385                 status = action_convert(table->a, action, data_in);
1386                 if (status) {
1387                         free(buffer);
1388                         free(rule);
1389                         return -1;
1390                 }
1391
1392                 /* Add rule (match, action) to table */
1393                 status = rte_pipeline_table_entry_add(p->p,
1394                                 table_id,
1395                                 &match_ll,
1396                                 data_in,
1397                                 &key_found,
1398                                 &data_out);
1399                 if (status) {
1400                         free(buffer);
1401                         free(rule);
1402                         return -1;
1403                 }
1404
1405                 /* Write Response */
1406                 rule->data = data_out;
1407                 table_rule_add(table, rule);
1408
1409                 free(buffer);
1410                 return 0;
1411         }
1412
1413         /* Allocate request */
1414         req = pipeline_msg_alloc();
1415         if (req == NULL) {
1416                 free(rule);
1417                 return -1;
1418         }
1419
1420         /* Write request */
1421         req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1422         req->id = table_id;
1423         memcpy(&req->table_rule_add.match, match, sizeof(*match));
1424         memcpy(&req->table_rule_add.action, action, sizeof(*action));
1425
1426         /* Send request and wait for response */
1427         rsp = pipeline_msg_send_recv(p, req);
1428         if (rsp == NULL) {
1429                 free(rule);
1430                 return -1;
1431         }
1432
1433         /* Read response */
1434         status = rsp->status;
1435         if (status == 0) {
1436                 rule->data = rsp->table_rule_add.data;
1437                 table_rule_add(table, rule);
1438         } else
1439                 free(rule);
1440
1441         /* Free response */
1442         pipeline_msg_free(rsp);
1443
1444         return status;
1445 }
1446
1447 int
1448 pipeline_table_rule_add_default(const char *pipeline_name,
1449         uint32_t table_id,
1450         struct table_rule_action *action)
1451 {
1452         struct pipeline *p;
1453         struct table *table;
1454         struct pipeline_msg_req *req;
1455         struct pipeline_msg_rsp *rsp;
1456         struct table_rule *rule;
1457         int status;
1458
1459         /* Check input params */
1460         if ((pipeline_name == NULL) ||
1461                 (action == NULL))
1462                 return -1;
1463
1464         p = pipeline_find(pipeline_name);
1465         if ((p == NULL) ||
1466                 (table_id >= p->n_tables) ||
1467                 action_default_check(action, p, table_id))
1468                 return -1;
1469
1470         table = &p->table[table_id];
1471
1472         rule = calloc(1, sizeof(struct table_rule));
1473         if (rule == NULL)
1474                 return -1;
1475
1476         memcpy(&rule->action, action, sizeof(*action));
1477
1478         if (!pipeline_is_running(p)) {
1479                 struct rte_pipeline_table_entry *data_in, *data_out;
1480                 uint8_t *buffer;
1481
1482                 buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1483                 if (buffer == NULL) {
1484                         free(rule);
1485                         return -1;
1486                 }
1487
1488                 /* Apply actions */
1489                 data_in = (struct rte_pipeline_table_entry *)buffer;
1490
1491                 data_in->action = action->fwd.action;
1492                 if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1493                         data_in->port_id = action->fwd.id;
1494                 if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1495                         data_in->table_id = action->fwd.id;
1496
1497                 /* Add default rule to table */
1498                 status = rte_pipeline_table_default_entry_add(p->p,
1499                                 table_id,
1500                                 data_in,
1501                                 &data_out);
1502                 if (status) {
1503                         free(buffer);
1504                         free(rule);
1505                         return -1;
1506                 }
1507
1508                 /* Write Response */
1509                 rule->data = data_out;
1510                 table_rule_default_add(table, rule);
1511
1512                 free(buffer);
1513                 return 0;
1514         }
1515
1516         /* Allocate request */
1517         req = pipeline_msg_alloc();
1518         if (req == NULL) {
1519                 free(rule);
1520                 return -1;
1521         }
1522
1523         /* Write request */
1524         req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1525         req->id = table_id;
1526         memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1527
1528         /* Send request and wait for response */
1529         rsp = pipeline_msg_send_recv(p, req);
1530         if (rsp == NULL) {
1531                 free(rule);
1532                 return -1;
1533         }
1534
1535         /* Read response */
1536         status = rsp->status;
1537         if (status == 0) {
1538                 rule->data = rsp->table_rule_add_default.data;
1539                 table_rule_default_add(table, rule);
1540         } else
1541                 free(rule);
1542
1543         /* Free response */
1544         pipeline_msg_free(rsp);
1545
1546         return status;
1547 }
1548
1549 static uint32_t
1550 table_rule_list_free(struct table_rule_list *list)
1551 {
1552         uint32_t n = 0;
1553
1554         if (!list)
1555                 return 0;
1556
1557         for ( ; ; ) {
1558                 struct table_rule *rule;
1559
1560                 rule = TAILQ_FIRST(list);
1561                 if (rule == NULL)
1562                         break;
1563
1564                 TAILQ_REMOVE(list, rule, node);
1565                 free(rule);
1566                 n++;
1567         }
1568
1569         free(list);
1570         return n;
1571 }
1572
1573 int
1574 pipeline_table_rule_add_bulk(const char *pipeline_name,
1575         uint32_t table_id,
1576         struct table_rule_list *list,
1577         uint32_t *n_rules_added,
1578         uint32_t *n_rules_not_added)
1579 {
1580         struct pipeline *p;
1581         struct table *table;
1582         struct pipeline_msg_req *req;
1583         struct pipeline_msg_rsp *rsp;
1584         struct table_rule *rule;
1585         int status = 0;
1586
1587         /* Check input params */
1588         if ((pipeline_name == NULL) ||
1589                 (list == NULL) ||
1590                 TAILQ_EMPTY(list) ||
1591                 (n_rules_added == NULL) ||
1592                 (n_rules_not_added == NULL)) {
1593                 table_rule_list_free(list);
1594                 return -EINVAL;
1595         }
1596
1597         p = pipeline_find(pipeline_name);
1598         if ((p == NULL) ||
1599                 (table_id >= p->n_tables)) {
1600                 table_rule_list_free(list);
1601                 return -EINVAL;
1602         }
1603
1604         table = &p->table[table_id];
1605
1606         TAILQ_FOREACH(rule, list, node)
1607                 if (match_check(&rule->match, p, table_id) ||
1608                         action_check(&rule->action, p, table_id)) {
1609                         table_rule_list_free(list);
1610                         return -EINVAL;
1611                 }
1612
1613         if (!pipeline_is_running(p)) {
1614                 struct table_ll table_ll = {
1615                         .p = p->p,
1616                         .table_id = table_id,
1617                         .a = table->a,
1618                         .bulk_supported = table->params.match_type == TABLE_ACL,
1619                 };
1620
1621                 status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1622                 if (status) {
1623                         table_rule_list_free(list);
1624                         return status;
1625                 }
1626
1627                 table_rule_add_bulk(table, list, *n_rules_added);
1628                 *n_rules_not_added = table_rule_list_free(list);
1629                 return 0;
1630         }
1631
1632         /* Allocate request */
1633         req = pipeline_msg_alloc();
1634         if (req == NULL) {
1635                 table_rule_list_free(list);
1636                 return -ENOMEM;
1637         }
1638
1639         /* Write request */
1640         req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1641         req->id = table_id;
1642         req->table_rule_add_bulk.list = list;
1643         req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1644
1645         /* Send request and wait for response */
1646         rsp = pipeline_msg_send_recv(p, req);
1647         if (rsp == NULL) {
1648                 table_rule_list_free(list);
1649                 return -ENOMEM;
1650         }
1651
1652         /* Read response */
1653         status = rsp->status;
1654         if (status == 0) {
1655                 *n_rules_added = rsp->table_rule_add_bulk.n_rules;
1656
1657                 table_rule_add_bulk(table, list, *n_rules_added);
1658                 *n_rules_not_added = table_rule_list_free(list);
1659         } else
1660                 table_rule_list_free(list);
1661
1662
1663         /* Free response */
1664         pipeline_msg_free(rsp);
1665
1666         return status;
1667 }
1668
1669 int
1670 pipeline_table_rule_delete(const char *pipeline_name,
1671         uint32_t table_id,
1672         struct table_rule_match *match)
1673 {
1674         struct pipeline *p;
1675         struct table *table;
1676         struct pipeline_msg_req *req;
1677         struct pipeline_msg_rsp *rsp;
1678         int status;
1679
1680         /* Check input params */
1681         if ((pipeline_name == NULL) ||
1682                 (match == NULL))
1683                 return -1;
1684
1685         p = pipeline_find(pipeline_name);
1686         if ((p == NULL) ||
1687                 (table_id >= p->n_tables) ||
1688                 match_check(match, p, table_id))
1689                 return -1;
1690
1691         table = &p->table[table_id];
1692
1693         if (!pipeline_is_running(p)) {
1694                 union table_rule_match_low_level match_ll;
1695                 int key_found;
1696
1697                 status = match_convert(match, &match_ll, 0);
1698                 if (status)
1699                         return -1;
1700
1701                 status = rte_pipeline_table_entry_delete(p->p,
1702                                 table_id,
1703                                 &match_ll,
1704                                 &key_found,
1705                                 NULL);
1706
1707                 if (status == 0)
1708                         table_rule_delete(table, match);
1709
1710                 return status;
1711         }
1712
1713         /* Allocate request */
1714         req = pipeline_msg_alloc();
1715         if (req == NULL)
1716                 return -1;
1717
1718         /* Write request */
1719         req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1720         req->id = table_id;
1721         memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1722
1723         /* Send request and wait for response */
1724         rsp = pipeline_msg_send_recv(p, req);
1725         if (rsp == NULL)
1726                 return -1;
1727
1728         /* Read response */
1729         status = rsp->status;
1730         if (status == 0)
1731                 table_rule_delete(table, match);
1732
1733         /* Free response */
1734         pipeline_msg_free(rsp);
1735
1736         return status;
1737 }
1738
1739 int
1740 pipeline_table_rule_delete_default(const char *pipeline_name,
1741         uint32_t table_id)
1742 {
1743         struct pipeline *p;
1744         struct table *table;
1745         struct pipeline_msg_req *req;
1746         struct pipeline_msg_rsp *rsp;
1747         int status;
1748
1749         /* Check input params */
1750         if (pipeline_name == NULL)
1751                 return -1;
1752
1753         p = pipeline_find(pipeline_name);
1754         if ((p == NULL) ||
1755                 (table_id >= p->n_tables))
1756                 return -1;
1757
1758         table = &p->table[table_id];
1759
1760         if (!pipeline_is_running(p)) {
1761                 status = rte_pipeline_table_default_entry_delete(p->p,
1762                         table_id,
1763                         NULL);
1764
1765                 if (status == 0)
1766                         table_rule_default_delete(table);
1767
1768                 return status;
1769         }
1770
1771         /* Allocate request */
1772         req = pipeline_msg_alloc();
1773         if (req == NULL)
1774                 return -1;
1775
1776         /* Write request */
1777         req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1778         req->id = table_id;
1779
1780         /* Send request and wait for response */
1781         rsp = pipeline_msg_send_recv(p, req);
1782         if (rsp == NULL)
1783                 return -1;
1784
1785         /* Read response */
1786         status = rsp->status;
1787         if (status == 0)
1788                 table_rule_default_delete(table);
1789
1790         /* Free response */
1791         pipeline_msg_free(rsp);
1792
1793         return status;
1794 }
1795
1796 int
1797 pipeline_table_rule_stats_read(const char *pipeline_name,
1798         uint32_t table_id,
1799         void *data,
1800         struct rte_table_action_stats_counters *stats,
1801         int clear)
1802 {
1803         struct pipeline *p;
1804         struct pipeline_msg_req *req;
1805         struct pipeline_msg_rsp *rsp;
1806         int status;
1807
1808         /* Check input params */
1809         if ((pipeline_name == NULL) ||
1810                 (data == NULL) ||
1811                 (stats == NULL))
1812                 return -1;
1813
1814         p = pipeline_find(pipeline_name);
1815         if ((p == NULL) ||
1816                 (table_id >= p->n_tables))
1817                 return -1;
1818
1819         if (!pipeline_is_running(p)) {
1820                 struct rte_table_action *a = p->table[table_id].a;
1821
1822                 status = rte_table_action_stats_read(a,
1823                         data,
1824                         stats,
1825                         clear);
1826
1827                 return status;
1828         }
1829
1830         /* Allocate request */
1831         req = pipeline_msg_alloc();
1832         if (req == NULL)
1833                 return -1;
1834
1835         /* Write request */
1836         req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1837         req->id = table_id;
1838         req->table_rule_stats_read.data = data;
1839         req->table_rule_stats_read.clear = clear;
1840
1841         /* Send request and wait for response */
1842         rsp = pipeline_msg_send_recv(p, req);
1843         if (rsp == NULL)
1844                 return -1;
1845
1846         /* Read response */
1847         status = rsp->status;
1848         if (status)
1849                 memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1850
1851         /* Free response */
1852         pipeline_msg_free(rsp);
1853
1854         return status;
1855 }
1856
1857 int
1858 pipeline_table_mtr_profile_add(const char *pipeline_name,
1859         uint32_t table_id,
1860         uint32_t meter_profile_id,
1861         struct rte_table_action_meter_profile *profile)
1862 {
1863         struct pipeline *p;
1864         struct pipeline_msg_req *req;
1865         struct pipeline_msg_rsp *rsp;
1866         int status;
1867
1868         /* Check input params */
1869         if ((pipeline_name == NULL) ||
1870                 (profile == NULL))
1871                 return -1;
1872
1873         p = pipeline_find(pipeline_name);
1874         if ((p == NULL) ||
1875                 (table_id >= p->n_tables))
1876                 return -1;
1877
1878         if (!pipeline_is_running(p)) {
1879                 struct rte_table_action *a = p->table[table_id].a;
1880
1881                 status = rte_table_action_meter_profile_add(a,
1882                         meter_profile_id,
1883                         profile);
1884
1885                 return status;
1886         }
1887
1888         /* Allocate request */
1889         req = pipeline_msg_alloc();
1890         if (req == NULL)
1891                 return -1;
1892
1893         /* Write request */
1894         req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1895         req->id = table_id;
1896         req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1897         memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1898
1899         /* Send request and wait for response */
1900         rsp = pipeline_msg_send_recv(p, req);
1901         if (rsp == NULL)
1902                 return -1;
1903
1904         /* Read response */
1905         status = rsp->status;
1906
1907         /* Free response */
1908         pipeline_msg_free(rsp);
1909
1910         return status;
1911 }
1912
1913 int
1914 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1915         uint32_t table_id,
1916         uint32_t meter_profile_id)
1917 {
1918         struct pipeline *p;
1919         struct pipeline_msg_req *req;
1920         struct pipeline_msg_rsp *rsp;
1921         int status;
1922
1923         /* Check input params */
1924         if (pipeline_name == NULL)
1925                 return -1;
1926
1927         p = pipeline_find(pipeline_name);
1928         if ((p == NULL) ||
1929                 (table_id >= p->n_tables))
1930                 return -1;
1931
1932         if (!pipeline_is_running(p)) {
1933                 struct rte_table_action *a = p->table[table_id].a;
1934
1935                 status = rte_table_action_meter_profile_delete(a,
1936                                 meter_profile_id);
1937
1938                 return status;
1939         }
1940
1941         /* Allocate request */
1942         req = pipeline_msg_alloc();
1943         if (req == NULL)
1944                 return -1;
1945
1946         /* Write request */
1947         req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1948         req->id = table_id;
1949         req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1950
1951         /* Send request and wait for response */
1952         rsp = pipeline_msg_send_recv(p, req);
1953         if (rsp == NULL)
1954                 return -1;
1955
1956         /* Read response */
1957         status = rsp->status;
1958
1959         /* Free response */
1960         pipeline_msg_free(rsp);
1961
1962         return status;
1963 }
1964
1965 int
1966 pipeline_table_rule_mtr_read(const char *pipeline_name,
1967         uint32_t table_id,
1968         void *data,
1969         uint32_t tc_mask,
1970         struct rte_table_action_mtr_counters *stats,
1971         int clear)
1972 {
1973         struct pipeline *p;
1974         struct pipeline_msg_req *req;
1975         struct pipeline_msg_rsp *rsp;
1976         int status;
1977
1978         /* Check input params */
1979         if ((pipeline_name == NULL) ||
1980                 (data == NULL) ||
1981                 (stats == NULL))
1982                 return -1;
1983
1984         p = pipeline_find(pipeline_name);
1985         if ((p == NULL) ||
1986                 (table_id >= p->n_tables))
1987                 return -1;
1988
1989         if (!pipeline_is_running(p)) {
1990                 struct rte_table_action *a = p->table[table_id].a;
1991
1992                 status = rte_table_action_meter_read(a,
1993                                 data,
1994                                 tc_mask,
1995                                 stats,
1996                                 clear);
1997
1998                 return status;
1999         }
2000
2001         /* Allocate request */
2002         req = pipeline_msg_alloc();
2003         if (req == NULL)
2004                 return -1;
2005
2006         /* Write request */
2007         req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
2008         req->id = table_id;
2009         req->table_rule_mtr_read.data = data;
2010         req->table_rule_mtr_read.tc_mask = tc_mask;
2011         req->table_rule_mtr_read.clear = clear;
2012
2013         /* Send request and wait for response */
2014         rsp = pipeline_msg_send_recv(p, req);
2015         if (rsp == NULL)
2016                 return -1;
2017
2018         /* Read response */
2019         status = rsp->status;
2020         if (status)
2021                 memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2022
2023         /* Free response */
2024         pipeline_msg_free(rsp);
2025
2026         return status;
2027 }
2028
2029 int
2030 pipeline_table_dscp_table_update(const char *pipeline_name,
2031         uint32_t table_id,
2032         uint64_t dscp_mask,
2033         struct rte_table_action_dscp_table *dscp_table)
2034 {
2035         struct pipeline *p;
2036         struct pipeline_msg_req *req;
2037         struct pipeline_msg_rsp *rsp;
2038         int status;
2039
2040         /* Check input params */
2041         if ((pipeline_name == NULL) ||
2042                 (dscp_table == NULL))
2043                 return -1;
2044
2045         p = pipeline_find(pipeline_name);
2046         if ((p == NULL) ||
2047                 (table_id >= p->n_tables))
2048                 return -1;
2049
2050         if (!pipeline_is_running(p)) {
2051                 struct rte_table_action *a = p->table[table_id].a;
2052
2053                 status = rte_table_action_dscp_table_update(a,
2054                                 dscp_mask,
2055                                 dscp_table);
2056
2057                 return status;
2058         }
2059
2060         /* Allocate request */
2061         req = pipeline_msg_alloc();
2062         if (req == NULL)
2063                 return -1;
2064
2065         /* Write request */
2066         req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2067         req->id = table_id;
2068         req->table_dscp_table_update.dscp_mask = dscp_mask;
2069         memcpy(&req->table_dscp_table_update.dscp_table,
2070                 dscp_table, sizeof(*dscp_table));
2071
2072         /* Send request and wait for response */
2073         rsp = pipeline_msg_send_recv(p, req);
2074         if (rsp == NULL)
2075                 return -1;
2076
2077         /* Read response */
2078         status = rsp->status;
2079
2080         /* Free response */
2081         pipeline_msg_free(rsp);
2082
2083         return status;
2084 }
2085
2086 int
2087 pipeline_table_rule_ttl_read(const char *pipeline_name,
2088         uint32_t table_id,
2089         void *data,
2090         struct rte_table_action_ttl_counters *stats,
2091         int clear)
2092 {
2093         struct pipeline *p;
2094         struct pipeline_msg_req *req;
2095         struct pipeline_msg_rsp *rsp;
2096         int status;
2097
2098         /* Check input params */
2099         if ((pipeline_name == NULL) ||
2100                 (data == NULL) ||
2101                 (stats == NULL))
2102                 return -1;
2103
2104         p = pipeline_find(pipeline_name);
2105         if ((p == NULL) ||
2106                 (table_id >= p->n_tables))
2107                 return -1;
2108
2109         if (!pipeline_is_running(p)) {
2110                 struct rte_table_action *a = p->table[table_id].a;
2111
2112                 status = rte_table_action_ttl_read(a,
2113                                 data,
2114                                 stats,
2115                                 clear);
2116
2117                 return status;
2118         }
2119
2120         /* Allocate request */
2121         req = pipeline_msg_alloc();
2122         if (req == NULL)
2123                 return -1;
2124
2125         /* Write request */
2126         req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2127         req->id = table_id;
2128         req->table_rule_ttl_read.data = data;
2129         req->table_rule_ttl_read.clear = clear;
2130
2131         /* Send request and wait for response */
2132         rsp = pipeline_msg_send_recv(p, req);
2133         if (rsp == NULL)
2134                 return -1;
2135
2136         /* Read response */
2137         status = rsp->status;
2138         if (status)
2139                 memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2140
2141         /* Free response */
2142         pipeline_msg_free(rsp);
2143
2144         return status;
2145 }
2146
2147 /**
2148  * Data plane threads: message handling
2149  */
2150 static inline struct pipeline_msg_req *
2151 pipeline_msg_recv(struct rte_ring *msgq_req)
2152 {
2153         struct pipeline_msg_req *req;
2154
2155         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2156
2157         if (status != 0)
2158                 return NULL;
2159
2160         return req;
2161 }
2162
2163 static inline void
2164 pipeline_msg_send(struct rte_ring *msgq_rsp,
2165         struct pipeline_msg_rsp *rsp)
2166 {
2167         int status;
2168
2169         do {
2170                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2171         } while (status == -ENOBUFS);
2172 }
2173
2174 static struct pipeline_msg_rsp *
2175 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2176         struct pipeline_msg_req *req)
2177 {
2178         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2179         uint32_t port_id = req->id;
2180         int clear = req->port_in_stats_read.clear;
2181
2182         rsp->status = rte_pipeline_port_in_stats_read(p->p,
2183                 port_id,
2184                 &rsp->port_in_stats_read.stats,
2185                 clear);
2186
2187         return rsp;
2188 }
2189
2190 static struct pipeline_msg_rsp *
2191 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2192         struct pipeline_msg_req *req)
2193 {
2194         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2195         uint32_t port_id = req->id;
2196
2197         rsp->status = rte_pipeline_port_in_enable(p->p,
2198                 port_id);
2199
2200         return rsp;
2201 }
2202
2203 static struct pipeline_msg_rsp *
2204 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2205         struct pipeline_msg_req *req)
2206 {
2207         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2208         uint32_t port_id = req->id;
2209
2210         rsp->status = rte_pipeline_port_in_disable(p->p,
2211                 port_id);
2212
2213         return rsp;
2214 }
2215
2216 static struct pipeline_msg_rsp *
2217 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2218         struct pipeline_msg_req *req)
2219 {
2220         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2221         uint32_t port_id = req->id;
2222         int clear = req->port_out_stats_read.clear;
2223
2224         rsp->status = rte_pipeline_port_out_stats_read(p->p,
2225                 port_id,
2226                 &rsp->port_out_stats_read.stats,
2227                 clear);
2228
2229         return rsp;
2230 }
2231
2232 static struct pipeline_msg_rsp *
2233 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2234         struct pipeline_msg_req *req)
2235 {
2236         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2237         uint32_t port_id = req->id;
2238         int clear = req->table_stats_read.clear;
2239
2240         rsp->status = rte_pipeline_table_stats_read(p->p,
2241                 port_id,
2242                 &rsp->table_stats_read.stats,
2243                 clear);
2244
2245         return rsp;
2246 }
2247
2248 static int
2249 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2250 {
2251         if (depth > 128)
2252                 return -1;
2253
2254         switch (depth / 32) {
2255         case 0:
2256                 depth32[0] = depth;
2257                 depth32[1] = 0;
2258                 depth32[2] = 0;
2259                 depth32[3] = 0;
2260                 return 0;
2261
2262         case 1:
2263                 depth32[0] = 32;
2264                 depth32[1] = depth - 32;
2265                 depth32[2] = 0;
2266                 depth32[3] = 0;
2267                 return 0;
2268
2269         case 2:
2270                 depth32[0] = 32;
2271                 depth32[1] = 32;
2272                 depth32[2] = depth - 64;
2273                 depth32[3] = 0;
2274                 return 0;
2275
2276         case 3:
2277                 depth32[0] = 32;
2278                 depth32[1] = 32;
2279                 depth32[2] = 32;
2280                 depth32[3] = depth - 96;
2281                 return 0;
2282
2283         case 4:
2284                 depth32[0] = 32;
2285                 depth32[1] = 32;
2286                 depth32[2] = 32;
2287                 depth32[3] = 32;
2288                 return 0;
2289
2290         default:
2291                 return -1;
2292         }
2293 }
2294
2295 static int
2296 match_convert(struct table_rule_match *mh,
2297         union table_rule_match_low_level *ml,
2298         int add)
2299 {
2300         memset(ml, 0, sizeof(*ml));
2301
2302         switch (mh->match_type) {
2303         case TABLE_ACL:
2304                 if (mh->match.acl.ip_version)
2305                         if (add) {
2306                                 ml->acl_add.field_value[0].value.u8 =
2307                                         mh->match.acl.proto;
2308                                 ml->acl_add.field_value[0].mask_range.u8 =
2309                                         mh->match.acl.proto_mask;
2310
2311                                 ml->acl_add.field_value[1].value.u32 =
2312                                         mh->match.acl.ipv4.sa;
2313                                 ml->acl_add.field_value[1].mask_range.u32 =
2314                                         mh->match.acl.sa_depth;
2315
2316                                 ml->acl_add.field_value[2].value.u32 =
2317                                         mh->match.acl.ipv4.da;
2318                                 ml->acl_add.field_value[2].mask_range.u32 =
2319                                         mh->match.acl.da_depth;
2320
2321                                 ml->acl_add.field_value[3].value.u16 =
2322                                         mh->match.acl.sp0;
2323                                 ml->acl_add.field_value[3].mask_range.u16 =
2324                                         mh->match.acl.sp1;
2325
2326                                 ml->acl_add.field_value[4].value.u16 =
2327                                         mh->match.acl.dp0;
2328                                 ml->acl_add.field_value[4].mask_range.u16 =
2329                                         mh->match.acl.dp1;
2330
2331                                 ml->acl_add.priority =
2332                                         (int32_t) mh->match.acl.priority;
2333                         } else {
2334                                 ml->acl_delete.field_value[0].value.u8 =
2335                                         mh->match.acl.proto;
2336                                 ml->acl_delete.field_value[0].mask_range.u8 =
2337                                         mh->match.acl.proto_mask;
2338
2339                                 ml->acl_delete.field_value[1].value.u32 =
2340                                         mh->match.acl.ipv4.sa;
2341                                 ml->acl_delete.field_value[1].mask_range.u32 =
2342                                         mh->match.acl.sa_depth;
2343
2344                                 ml->acl_delete.field_value[2].value.u32 =
2345                                         mh->match.acl.ipv4.da;
2346                                 ml->acl_delete.field_value[2].mask_range.u32 =
2347                                         mh->match.acl.da_depth;
2348
2349                                 ml->acl_delete.field_value[3].value.u16 =
2350                                         mh->match.acl.sp0;
2351                                 ml->acl_delete.field_value[3].mask_range.u16 =
2352                                         mh->match.acl.sp1;
2353
2354                                 ml->acl_delete.field_value[4].value.u16 =
2355                                         mh->match.acl.dp0;
2356                                 ml->acl_delete.field_value[4].mask_range.u16 =
2357                                         mh->match.acl.dp1;
2358                         }
2359                 else
2360                         if (add) {
2361                                 uint32_t *sa32 =
2362                                         (uint32_t *) mh->match.acl.ipv6.sa;
2363                                 uint32_t *da32 =
2364                                         (uint32_t *) mh->match.acl.ipv6.da;
2365                                 uint32_t sa32_depth[4], da32_depth[4];
2366                                 int status;
2367
2368                                 status = match_convert_ipv6_depth(
2369                                         mh->match.acl.sa_depth,
2370                                         sa32_depth);
2371                                 if (status)
2372                                         return status;
2373
2374                                 status = match_convert_ipv6_depth(
2375                                         mh->match.acl.da_depth,
2376                                         da32_depth);
2377                                 if (status)
2378                                         return status;
2379
2380                                 ml->acl_add.field_value[0].value.u8 =
2381                                         mh->match.acl.proto;
2382                                 ml->acl_add.field_value[0].mask_range.u8 =
2383                                         mh->match.acl.proto_mask;
2384
2385                                 ml->acl_add.field_value[1].value.u32 =
2386                                         rte_be_to_cpu_32(sa32[0]);
2387                                 ml->acl_add.field_value[1].mask_range.u32 =
2388                                         sa32_depth[0];
2389                                 ml->acl_add.field_value[2].value.u32 =
2390                                         rte_be_to_cpu_32(sa32[1]);
2391                                 ml->acl_add.field_value[2].mask_range.u32 =
2392                                         sa32_depth[1];
2393                                 ml->acl_add.field_value[3].value.u32 =
2394                                         rte_be_to_cpu_32(sa32[2]);
2395                                 ml->acl_add.field_value[3].mask_range.u32 =
2396                                         sa32_depth[2];
2397                                 ml->acl_add.field_value[4].value.u32 =
2398                                         rte_be_to_cpu_32(sa32[3]);
2399                                 ml->acl_add.field_value[4].mask_range.u32 =
2400                                         sa32_depth[3];
2401
2402                                 ml->acl_add.field_value[5].value.u32 =
2403                                         rte_be_to_cpu_32(da32[0]);
2404                                 ml->acl_add.field_value[5].mask_range.u32 =
2405                                         da32_depth[0];
2406                                 ml->acl_add.field_value[6].value.u32 =
2407                                         rte_be_to_cpu_32(da32[1]);
2408                                 ml->acl_add.field_value[6].mask_range.u32 =
2409                                         da32_depth[1];
2410                                 ml->acl_add.field_value[7].value.u32 =
2411                                         rte_be_to_cpu_32(da32[2]);
2412                                 ml->acl_add.field_value[7].mask_range.u32 =
2413                                         da32_depth[2];
2414                                 ml->acl_add.field_value[8].value.u32 =
2415                                         rte_be_to_cpu_32(da32[3]);
2416                                 ml->acl_add.field_value[8].mask_range.u32 =
2417                                         da32_depth[3];
2418
2419                                 ml->acl_add.field_value[9].value.u16 =
2420                                         mh->match.acl.sp0;
2421                                 ml->acl_add.field_value[9].mask_range.u16 =
2422                                         mh->match.acl.sp1;
2423
2424                                 ml->acl_add.field_value[10].value.u16 =
2425                                         mh->match.acl.dp0;
2426                                 ml->acl_add.field_value[10].mask_range.u16 =
2427                                         mh->match.acl.dp1;
2428
2429                                 ml->acl_add.priority =
2430                                         (int32_t) mh->match.acl.priority;
2431                         } else {
2432                                 uint32_t *sa32 =
2433                                         (uint32_t *) mh->match.acl.ipv6.sa;
2434                                 uint32_t *da32 =
2435                                         (uint32_t *) mh->match.acl.ipv6.da;
2436                                 uint32_t sa32_depth[4], da32_depth[4];
2437                                 int status;
2438
2439                                 status = match_convert_ipv6_depth(
2440                                         mh->match.acl.sa_depth,
2441                                         sa32_depth);
2442                                 if (status)
2443                                         return status;
2444
2445                                 status = match_convert_ipv6_depth(
2446                                         mh->match.acl.da_depth,
2447                                         da32_depth);
2448                                 if (status)
2449                                         return status;
2450
2451                                 ml->acl_delete.field_value[0].value.u8 =
2452                                         mh->match.acl.proto;
2453                                 ml->acl_delete.field_value[0].mask_range.u8 =
2454                                         mh->match.acl.proto_mask;
2455
2456                                 ml->acl_delete.field_value[1].value.u32 =
2457                                         rte_be_to_cpu_32(sa32[0]);
2458                                 ml->acl_delete.field_value[1].mask_range.u32 =
2459                                         sa32_depth[0];
2460                                 ml->acl_delete.field_value[2].value.u32 =
2461                                         rte_be_to_cpu_32(sa32[1]);
2462                                 ml->acl_delete.field_value[2].mask_range.u32 =
2463                                         sa32_depth[1];
2464                                 ml->acl_delete.field_value[3].value.u32 =
2465                                         rte_be_to_cpu_32(sa32[2]);
2466                                 ml->acl_delete.field_value[3].mask_range.u32 =
2467                                         sa32_depth[2];
2468                                 ml->acl_delete.field_value[4].value.u32 =
2469                                         rte_be_to_cpu_32(sa32[3]);
2470                                 ml->acl_delete.field_value[4].mask_range.u32 =
2471                                         sa32_depth[3];
2472
2473                                 ml->acl_delete.field_value[5].value.u32 =
2474                                         rte_be_to_cpu_32(da32[0]);
2475                                 ml->acl_delete.field_value[5].mask_range.u32 =
2476                                         da32_depth[0];
2477                                 ml->acl_delete.field_value[6].value.u32 =
2478                                         rte_be_to_cpu_32(da32[1]);
2479                                 ml->acl_delete.field_value[6].mask_range.u32 =
2480                                         da32_depth[1];
2481                                 ml->acl_delete.field_value[7].value.u32 =
2482                                         rte_be_to_cpu_32(da32[2]);
2483                                 ml->acl_delete.field_value[7].mask_range.u32 =
2484                                         da32_depth[2];
2485                                 ml->acl_delete.field_value[8].value.u32 =
2486                                         rte_be_to_cpu_32(da32[3]);
2487                                 ml->acl_delete.field_value[8].mask_range.u32 =
2488                                         da32_depth[3];
2489
2490                                 ml->acl_delete.field_value[9].value.u16 =
2491                                         mh->match.acl.sp0;
2492                                 ml->acl_delete.field_value[9].mask_range.u16 =
2493                                         mh->match.acl.sp1;
2494
2495                                 ml->acl_delete.field_value[10].value.u16 =
2496                                         mh->match.acl.dp0;
2497                                 ml->acl_delete.field_value[10].mask_range.u16 =
2498                                         mh->match.acl.dp1;
2499                         }
2500                 return 0;
2501
2502         case TABLE_ARRAY:
2503                 ml->array.pos = mh->match.array.pos;
2504                 return 0;
2505
2506         case TABLE_HASH:
2507                 memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2508                 return 0;
2509
2510         case TABLE_LPM:
2511                 if (mh->match.lpm.ip_version) {
2512                         ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2513                         ml->lpm_ipv4.depth = mh->match.lpm.depth;
2514                 } else {
2515                         memcpy(ml->lpm_ipv6.ip,
2516                                 mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2517                         ml->lpm_ipv6.depth = mh->match.lpm.depth;
2518                 }
2519
2520                 return 0;
2521
2522         default:
2523                 return -1;
2524         }
2525 }
2526
2527 static int
2528 action_convert(struct rte_table_action *a,
2529         struct table_rule_action *action,
2530         struct rte_pipeline_table_entry *data)
2531 {
2532         int status;
2533
2534         /* Apply actions */
2535         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2536                 status = rte_table_action_apply(a,
2537                         data,
2538                         RTE_TABLE_ACTION_FWD,
2539                         &action->fwd);
2540
2541                 if (status)
2542                         return status;
2543         }
2544
2545         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2546                 status = rte_table_action_apply(a,
2547                         data,
2548                         RTE_TABLE_ACTION_LB,
2549                         &action->lb);
2550
2551                 if (status)
2552                         return status;
2553         }
2554
2555         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2556                 status = rte_table_action_apply(a,
2557                         data,
2558                         RTE_TABLE_ACTION_MTR,
2559                         &action->mtr);
2560
2561                 if (status)
2562                         return status;
2563         }
2564
2565         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2566                 status = rte_table_action_apply(a,
2567                         data,
2568                         RTE_TABLE_ACTION_TM,
2569                         &action->tm);
2570
2571                 if (status)
2572                         return status;
2573         }
2574
2575         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2576                 status = rte_table_action_apply(a,
2577                         data,
2578                         RTE_TABLE_ACTION_ENCAP,
2579                         &action->encap);
2580
2581                 if (status)
2582                         return status;
2583         }
2584
2585         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2586                 status = rte_table_action_apply(a,
2587                         data,
2588                         RTE_TABLE_ACTION_NAT,
2589                         &action->nat);
2590
2591                 if (status)
2592                         return status;
2593         }
2594
2595         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2596                 status = rte_table_action_apply(a,
2597                         data,
2598                         RTE_TABLE_ACTION_TTL,
2599                         &action->ttl);
2600
2601                 if (status)
2602                         return status;
2603         }
2604
2605         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2606                 status = rte_table_action_apply(a,
2607                         data,
2608                         RTE_TABLE_ACTION_STATS,
2609                         &action->stats);
2610
2611                 if (status)
2612                         return status;
2613         }
2614
2615         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2616                 status = rte_table_action_apply(a,
2617                         data,
2618                         RTE_TABLE_ACTION_TIME,
2619                         &action->time);
2620
2621                 if (status)
2622                         return status;
2623         }
2624
2625         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2626                 status = rte_table_action_apply(a,
2627                         data,
2628                         RTE_TABLE_ACTION_SYM_CRYPTO,
2629                         &action->sym_crypto);
2630
2631                 if (status)
2632                         return status;
2633         }
2634
2635         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2636                 status = rte_table_action_apply(a,
2637                         data,
2638                         RTE_TABLE_ACTION_TAG,
2639                         &action->tag);
2640
2641                 if (status)
2642                         return status;
2643         }
2644
2645         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2646                 status = rte_table_action_apply(a,
2647                         data,
2648                         RTE_TABLE_ACTION_DECAP,
2649                         &action->decap);
2650
2651                 if (status)
2652                         return status;
2653         }
2654
2655         return 0;
2656 }
2657
2658 static struct pipeline_msg_rsp *
2659 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2660         struct pipeline_msg_req *req)
2661 {
2662         union table_rule_match_low_level match_ll;
2663         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2664         struct table_rule_match *match = &req->table_rule_add.match;
2665         struct table_rule_action *action = &req->table_rule_add.action;
2666         struct rte_pipeline_table_entry *data_in, *data_out;
2667         uint32_t table_id = req->id;
2668         int key_found, status;
2669         struct rte_table_action *a = p->table_data[table_id].a;
2670
2671         /* Apply actions */
2672         memset(p->buffer, 0, sizeof(p->buffer));
2673         data_in = (struct rte_pipeline_table_entry *) p->buffer;
2674
2675         status = match_convert(match, &match_ll, 1);
2676         if (status) {
2677                 rsp->status = -1;
2678                 return rsp;
2679         }
2680
2681         status = action_convert(a, action, data_in);
2682         if (status) {
2683                 rsp->status = -1;
2684                 return rsp;
2685         }
2686
2687         status = rte_pipeline_table_entry_add(p->p,
2688                 table_id,
2689                 &match_ll,
2690                 data_in,
2691                 &key_found,
2692                 &data_out);
2693         if (status) {
2694                 rsp->status = -1;
2695                 return rsp;
2696         }
2697
2698         /* Write response */
2699         rsp->status = 0;
2700         rsp->table_rule_add.data = data_out;
2701
2702         return rsp;
2703 }
2704
2705 static struct pipeline_msg_rsp *
2706 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2707         struct pipeline_msg_req *req)
2708 {
2709         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2710         struct table_rule_action *action = &req->table_rule_add_default.action;
2711         struct rte_pipeline_table_entry *data_in, *data_out;
2712         uint32_t table_id = req->id;
2713         int status;
2714
2715         /* Apply actions */
2716         memset(p->buffer, 0, sizeof(p->buffer));
2717         data_in = (struct rte_pipeline_table_entry *) p->buffer;
2718
2719         data_in->action = action->fwd.action;
2720         if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2721                 data_in->port_id = action->fwd.id;
2722         if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2723                 data_in->table_id = action->fwd.id;
2724
2725         /* Add default rule to table */
2726         status = rte_pipeline_table_default_entry_add(p->p,
2727                 table_id,
2728                 data_in,
2729                 &data_out);
2730         if (status) {
2731                 rsp->status = -1;
2732                 return rsp;
2733         }
2734
2735         /* Write response */
2736         rsp->status = 0;
2737         rsp->table_rule_add_default.data = data_out;
2738
2739         return rsp;
2740 }
2741
2742 static struct pipeline_msg_rsp *
2743 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2744         struct pipeline_msg_req *req)
2745 {
2746         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2747
2748         uint32_t table_id = req->id;
2749         struct table_rule_list *list = req->table_rule_add_bulk.list;
2750         uint32_t bulk = req->table_rule_add_bulk.bulk;
2751
2752         uint32_t n_rules_added;
2753         int status;
2754
2755         struct table_ll table_ll = {
2756                 .p = p->p,
2757                 .table_id = table_id,
2758                 .a = p->table_data[table_id].a,
2759                 .bulk_supported = bulk,
2760         };
2761
2762         status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2763         if (status) {
2764                 rsp->status = -1;
2765                 rsp->table_rule_add_bulk.n_rules = 0;
2766                 return rsp;
2767         }
2768
2769         /* Write response */
2770         rsp->status = 0;
2771         rsp->table_rule_add_bulk.n_rules = n_rules_added;
2772         return rsp;
2773 }
2774
2775 static struct pipeline_msg_rsp *
2776 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2777         struct pipeline_msg_req *req)
2778 {
2779         union table_rule_match_low_level match_ll;
2780         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2781         struct table_rule_match *match = &req->table_rule_delete.match;
2782         uint32_t table_id = req->id;
2783         int key_found, status;
2784
2785         status = match_convert(match, &match_ll, 0);
2786         if (status) {
2787                 rsp->status = -1;
2788                 return rsp;
2789         }
2790
2791         rsp->status = rte_pipeline_table_entry_delete(p->p,
2792                 table_id,
2793                 &match_ll,
2794                 &key_found,
2795                 NULL);
2796
2797         return rsp;
2798 }
2799
2800 static struct pipeline_msg_rsp *
2801 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2802         struct pipeline_msg_req *req)
2803 {
2804         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2805         uint32_t table_id = req->id;
2806
2807         rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2808                 table_id,
2809                 NULL);
2810
2811         return rsp;
2812 }
2813
2814 static struct pipeline_msg_rsp *
2815 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2816         struct pipeline_msg_req *req)
2817 {
2818         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2819         uint32_t table_id = req->id;
2820         void *data = req->table_rule_stats_read.data;
2821         int clear = req->table_rule_stats_read.clear;
2822         struct rte_table_action *a = p->table_data[table_id].a;
2823
2824         rsp->status = rte_table_action_stats_read(a,
2825                 data,
2826                 &rsp->table_rule_stats_read.stats,
2827                 clear);
2828
2829         return rsp;
2830 }
2831
2832 static struct pipeline_msg_rsp *
2833 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2834         struct pipeline_msg_req *req)
2835 {
2836         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2837         uint32_t table_id = req->id;
2838         uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2839         struct rte_table_action_meter_profile *profile =
2840                 &req->table_mtr_profile_add.profile;
2841         struct rte_table_action *a = p->table_data[table_id].a;
2842
2843         rsp->status = rte_table_action_meter_profile_add(a,
2844                 meter_profile_id,
2845                 profile);
2846
2847         return rsp;
2848 }
2849
2850 static struct pipeline_msg_rsp *
2851 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2852         struct pipeline_msg_req *req)
2853 {
2854         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2855         uint32_t table_id = req->id;
2856         uint32_t meter_profile_id =
2857                 req->table_mtr_profile_delete.meter_profile_id;
2858         struct rte_table_action *a = p->table_data[table_id].a;
2859
2860         rsp->status = rte_table_action_meter_profile_delete(a,
2861                 meter_profile_id);
2862
2863         return rsp;
2864 }
2865
2866 static struct pipeline_msg_rsp *
2867 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2868         struct pipeline_msg_req *req)
2869 {
2870         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2871         uint32_t table_id = req->id;
2872         void *data = req->table_rule_mtr_read.data;
2873         uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2874         int clear = req->table_rule_mtr_read.clear;
2875         struct rte_table_action *a = p->table_data[table_id].a;
2876
2877         rsp->status = rte_table_action_meter_read(a,
2878                 data,
2879                 tc_mask,
2880                 &rsp->table_rule_mtr_read.stats,
2881                 clear);
2882
2883         return rsp;
2884 }
2885
2886 static struct pipeline_msg_rsp *
2887 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2888         struct pipeline_msg_req *req)
2889 {
2890         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2891         uint32_t table_id = req->id;
2892         uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2893         struct rte_table_action_dscp_table *dscp_table =
2894                 &req->table_dscp_table_update.dscp_table;
2895         struct rte_table_action *a = p->table_data[table_id].a;
2896
2897         rsp->status = rte_table_action_dscp_table_update(a,
2898                 dscp_mask,
2899                 dscp_table);
2900
2901         return rsp;
2902 }
2903
2904 static struct pipeline_msg_rsp *
2905 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2906         struct pipeline_msg_req *req)
2907 {
2908         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2909         uint32_t table_id = req->id;
2910         void *data = req->table_rule_ttl_read.data;
2911         int clear = req->table_rule_ttl_read.clear;
2912         struct rte_table_action *a = p->table_data[table_id].a;
2913
2914         rsp->status = rte_table_action_ttl_read(a,
2915                 data,
2916                 &rsp->table_rule_ttl_read.stats,
2917                 clear);
2918
2919         return rsp;
2920 }
2921
2922 static void
2923 pipeline_msg_handle(struct pipeline_data *p)
2924 {
2925         for ( ; ; ) {
2926                 struct pipeline_msg_req *req;
2927                 struct pipeline_msg_rsp *rsp;
2928
2929                 req = pipeline_msg_recv(p->msgq_req);
2930                 if (req == NULL)
2931                         break;
2932
2933                 switch (req->type) {
2934                 case PIPELINE_REQ_PORT_IN_STATS_READ:
2935                         rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2936                         break;
2937
2938                 case PIPELINE_REQ_PORT_IN_ENABLE:
2939                         rsp = pipeline_msg_handle_port_in_enable(p, req);
2940                         break;
2941
2942                 case PIPELINE_REQ_PORT_IN_DISABLE:
2943                         rsp = pipeline_msg_handle_port_in_disable(p, req);
2944                         break;
2945
2946                 case PIPELINE_REQ_PORT_OUT_STATS_READ:
2947                         rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2948                         break;
2949
2950                 case PIPELINE_REQ_TABLE_STATS_READ:
2951                         rsp = pipeline_msg_handle_table_stats_read(p, req);
2952                         break;
2953
2954                 case PIPELINE_REQ_TABLE_RULE_ADD:
2955                         rsp = pipeline_msg_handle_table_rule_add(p, req);
2956                         break;
2957
2958                 case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2959                         rsp = pipeline_msg_handle_table_rule_add_default(p,     req);
2960                         break;
2961
2962                 case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2963                         rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2964                         break;
2965
2966                 case PIPELINE_REQ_TABLE_RULE_DELETE:
2967                         rsp = pipeline_msg_handle_table_rule_delete(p, req);
2968                         break;
2969
2970                 case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2971                         rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2972                         break;
2973
2974                 case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2975                         rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2976                         break;
2977
2978                 case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2979                         rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2980                         break;
2981
2982                 case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2983                         rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2984                         break;
2985
2986                 case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2987                         rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2988                         break;
2989
2990                 case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2991                         rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2992                         break;
2993
2994                 case PIPELINE_REQ_TABLE_RULE_TTL_READ:
2995                         rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
2996                         break;
2997
2998                 default:
2999                         rsp = (struct pipeline_msg_rsp *) req;
3000                         rsp->status = -1;
3001                 }
3002
3003                 pipeline_msg_send(p->msgq_rsp, rsp);
3004         }
3005 }
3006
3007 /**
3008  * Data plane threads: main
3009  */
3010 int
3011 thread_main(void *arg __rte_unused)
3012 {
3013         struct thread_data *t;
3014         uint32_t thread_id, i;
3015
3016         thread_id = rte_lcore_id();
3017         t = &thread_data[thread_id];
3018
3019         /* Dispatch loop */
3020         for (i = 0; ; i++) {
3021                 uint32_t j;
3022
3023                 /* Data Plane */
3024                 for (j = 0; j < t->n_pipelines; j++)
3025                         rte_pipeline_run(t->p[j]);
3026
3027                 /* Control Plane */
3028                 if ((i & 0xF) == 0) {
3029                         uint64_t time = rte_get_tsc_cycles();
3030                         uint64_t time_next_min = UINT64_MAX;
3031
3032                         if (time < t->time_next_min)
3033                                 continue;
3034
3035                         /* Pipeline message queues */
3036                         for (j = 0; j < t->n_pipelines; j++) {
3037                                 struct pipeline_data *p =
3038                                         &t->pipeline_data[j];
3039                                 uint64_t time_next = p->time_next;
3040
3041                                 if (time_next <= time) {
3042                                         pipeline_msg_handle(p);
3043                                         rte_pipeline_flush(p->p);
3044                                         time_next = time + p->timer_period;
3045                                         p->time_next = time_next;
3046                                 }
3047
3048                                 if (time_next < time_next_min)
3049                                         time_next_min = time_next;
3050                         }
3051
3052                         /* Thread message queues */
3053                         {
3054                                 uint64_t time_next = t->time_next;
3055
3056                                 if (time_next <= time) {
3057                                         thread_msg_handle(t);
3058                                         time_next = time + t->timer_period;
3059                                         t->time_next = time_next;
3060                                 }
3061
3062                                 if (time_next < time_next_min)
3063                                         time_next_min = time_next;
3064                         }
3065
3066                         t->time_next_min = time_next_min;
3067                 }
3068         }
3069
3070         return 0;
3071 }