examples/ip_pipeline: track table rules on delete
[dpdk.git] / examples / ip_pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38         struct rte_ring *msgq_req;
39         struct rte_ring *msgq_rsp;
40
41         uint32_t enabled;
42 };
43
44 static struct thread thread[RTE_MAX_LCORE];
45
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50         struct rte_table_action *a;
51 };
52
53 struct pipeline_data {
54         struct rte_pipeline *p;
55         struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56         uint32_t n_tables;
57
58         struct rte_ring *msgq_req;
59         struct rte_ring *msgq_rsp;
60         uint64_t timer_period; /* Measured in CPU cycles. */
61         uint64_t time_next;
62
63         uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65
66 struct thread_data {
67         struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68         uint32_t n_pipelines;
69
70         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71         struct rte_ring *msgq_req;
72         struct rte_ring *msgq_rsp;
73         uint64_t timer_period; /* Measured in CPU cycles. */
74         uint64_t time_next;
75         uint64_t time_next_min;
76 } __rte_cache_aligned;
77
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86         uint32_t i;
87
88         for (i = 0; i < RTE_MAX_LCORE; i++) {
89                 struct thread *t = &thread[i];
90
91                 if (!rte_lcore_is_enabled(i))
92                         continue;
93
94                 /* MSGQs */
95                 if (t->msgq_req)
96                         rte_ring_free(t->msgq_req);
97
98                 if (t->msgq_rsp)
99                         rte_ring_free(t->msgq_rsp);
100         }
101 }
102
103 int
104 thread_init(void)
105 {
106         uint32_t i;
107
108         RTE_LCORE_FOREACH_SLAVE(i) {
109                 char name[NAME_MAX];
110                 struct rte_ring *msgq_req, *msgq_rsp;
111                 struct thread *t = &thread[i];
112                 struct thread_data *t_data = &thread_data[i];
113                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
114
115                 /* MSGQs */
116                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117
118                 msgq_req = rte_ring_create(name,
119                         THREAD_MSGQ_SIZE,
120                         cpu_id,
121                         RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123                 if (msgq_req == NULL) {
124                         thread_free();
125                         return -1;
126                 }
127
128                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129
130                 msgq_rsp = rte_ring_create(name,
131                         THREAD_MSGQ_SIZE,
132                         cpu_id,
133                         RING_F_SP_ENQ | RING_F_SC_DEQ);
134
135                 if (msgq_rsp == NULL) {
136                         thread_free();
137                         return -1;
138                 }
139
140                 /* Master thread records */
141                 t->msgq_req = msgq_req;
142                 t->msgq_rsp = msgq_rsp;
143                 t->enabled = 1;
144
145                 /* Data plane thread records */
146                 t_data->n_pipelines = 0;
147                 t_data->msgq_req = msgq_req;
148                 t_data->msgq_rsp = msgq_rsp;
149                 t_data->timer_period =
150                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152                 t_data->time_next_min = t_data->time_next;
153         }
154
155         return 0;
156 }
157
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161         enum rte_lcore_state_t thread_state;
162
163         thread_state = rte_eal_get_lcore_state(thread_id);
164         return (thread_state == RUNNING) ? 1 : 0;
165 }
166
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175         if (p->enabled == 0)
176                 return 0;
177
178         return thread_is_running(p->thread_id);
179 }
180
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185         THREAD_REQ_PIPELINE_ENABLE = 0,
186         THREAD_REQ_PIPELINE_DISABLE,
187         THREAD_REQ_MAX
188 };
189
190 struct thread_msg_req {
191         enum thread_req_type type;
192
193         union {
194                 struct {
195                         struct rte_pipeline *p;
196                         struct {
197                                 struct rte_table_action *a;
198                         } table[RTE_PIPELINE_TABLE_MAX];
199                         struct rte_ring *msgq_req;
200                         struct rte_ring *msgq_rsp;
201                         uint32_t timer_period_ms;
202                         uint32_t n_tables;
203                 } pipeline_enable;
204
205                 struct {
206                         struct rte_pipeline *p;
207                 } pipeline_disable;
208         };
209 };
210
211 struct thread_msg_rsp {
212         int status;
213 };
214
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222                 sizeof(struct thread_msg_rsp));
223
224         return calloc(1, size);
225 }
226
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230         free(rsp);
231 }
232
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235         struct thread_msg_req *req)
236 {
237         struct thread *t = &thread[thread_id];
238         struct rte_ring *msgq_req = t->msgq_req;
239         struct rte_ring *msgq_rsp = t->msgq_rsp;
240         struct thread_msg_rsp *rsp;
241         int status;
242
243         /* send */
244         do {
245                 status = rte_ring_sp_enqueue(msgq_req, req);
246         } while (status == -ENOBUFS);
247
248         /* recv */
249         do {
250                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251         } while (status != 0);
252
253         return rsp;
254 }
255
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258         const char *pipeline_name)
259 {
260         struct pipeline *p = pipeline_find(pipeline_name);
261         struct thread *t;
262         struct thread_msg_req *req;
263         struct thread_msg_rsp *rsp;
264         uint32_t i;
265         int status;
266
267         /* Check input params */
268         if ((thread_id >= RTE_MAX_LCORE) ||
269                 (p == NULL) ||
270                 (p->n_ports_in == 0) ||
271                 (p->n_ports_out == 0) ||
272                 (p->n_tables == 0))
273                 return -1;
274
275         t = &thread[thread_id];
276         if ((t->enabled == 0) ||
277                 p->enabled)
278                 return -1;
279
280         if (!thread_is_running(thread_id)) {
281                 struct thread_data *td = &thread_data[thread_id];
282                 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283
284                 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285                         return -1;
286
287                 /* Data plane thread */
288                 td->p[td->n_pipelines] = p->p;
289
290                 tdp->p = p->p;
291                 for (i = 0; i < p->n_tables; i++)
292                         tdp->table_data[i].a = p->table[i].a;
293
294                 tdp->n_tables = p->n_tables;
295
296                 tdp->msgq_req = p->msgq_req;
297                 tdp->msgq_rsp = p->msgq_rsp;
298                 tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299                 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300
301                 td->n_pipelines++;
302
303                 /* Pipeline */
304                 p->thread_id = thread_id;
305                 p->enabled = 1;
306
307                 return 0;
308         }
309
310         /* Allocate request */
311         req = thread_msg_alloc();
312         if (req == NULL)
313                 return -1;
314
315         /* Write request */
316         req->type = THREAD_REQ_PIPELINE_ENABLE;
317         req->pipeline_enable.p = p->p;
318         for (i = 0; i < p->n_tables; i++)
319                 req->pipeline_enable.table[i].a =
320                         p->table[i].a;
321         req->pipeline_enable.msgq_req = p->msgq_req;
322         req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324         req->pipeline_enable.n_tables = p->n_tables;
325
326         /* Send request and wait for response */
327         rsp = thread_msg_send_recv(thread_id, req);
328         if (rsp == NULL)
329                 return -1;
330
331         /* Read response */
332         status = rsp->status;
333
334         /* Free response */
335         thread_msg_free(rsp);
336
337         /* Request completion */
338         if (status)
339                 return status;
340
341         p->thread_id = thread_id;
342         p->enabled = 1;
343
344         return 0;
345 }
346
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349         const char *pipeline_name)
350 {
351         struct pipeline *p = pipeline_find(pipeline_name);
352         struct thread *t;
353         struct thread_msg_req *req;
354         struct thread_msg_rsp *rsp;
355         int status;
356
357         /* Check input params */
358         if ((thread_id >= RTE_MAX_LCORE) ||
359                 (p == NULL))
360                 return -1;
361
362         t = &thread[thread_id];
363         if (t->enabled == 0)
364                 return -1;
365
366         if (p->enabled == 0)
367                 return 0;
368
369         if (p->thread_id != thread_id)
370                 return -1;
371
372         if (!thread_is_running(thread_id)) {
373                 struct thread_data *td = &thread_data[thread_id];
374                 uint32_t i;
375
376                 for (i = 0; i < td->n_pipelines; i++) {
377                         struct pipeline_data *tdp = &td->pipeline_data[i];
378
379                         if (tdp->p != p->p)
380                                 continue;
381
382                         /* Data plane thread */
383                         if (i < td->n_pipelines - 1) {
384                                 struct rte_pipeline *pipeline_last =
385                                         td->p[td->n_pipelines - 1];
386                                 struct pipeline_data *tdp_last =
387                                         &td->pipeline_data[td->n_pipelines - 1];
388
389                                 td->p[i] = pipeline_last;
390                                 memcpy(tdp, tdp_last, sizeof(*tdp));
391                         }
392
393                         td->n_pipelines--;
394
395                         /* Pipeline */
396                         p->enabled = 0;
397
398                         break;
399                 }
400
401                 return 0;
402         }
403
404         /* Allocate request */
405         req = thread_msg_alloc();
406         if (req == NULL)
407                 return -1;
408
409         /* Write request */
410         req->type = THREAD_REQ_PIPELINE_DISABLE;
411         req->pipeline_disable.p = p->p;
412
413         /* Send request and wait for response */
414         rsp = thread_msg_send_recv(thread_id, req);
415         if (rsp == NULL)
416                 return -1;
417
418         /* Read response */
419         status = rsp->status;
420
421         /* Free response */
422         thread_msg_free(rsp);
423
424         /* Request completion */
425         if (status)
426                 return status;
427
428         p->enabled = 0;
429
430         return 0;
431 }
432
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439         struct thread_msg_req *req;
440
441         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442
443         if (status != 0)
444                 return NULL;
445
446         return req;
447 }
448
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451         struct thread_msg_rsp *rsp)
452 {
453         int status;
454
455         do {
456                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457         } while (status == -ENOBUFS);
458 }
459
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462         struct thread_msg_req *req)
463 {
464         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466         uint32_t i;
467
468         /* Request */
469         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470                 rsp->status = -1;
471                 return rsp;
472         }
473
474         t->p[t->n_pipelines] = req->pipeline_enable.p;
475
476         p->p = req->pipeline_enable.p;
477         for (i = 0; i < req->pipeline_enable.n_tables; i++)
478                 p->table_data[i].a =
479                         req->pipeline_enable.table[i].a;
480
481         p->n_tables = req->pipeline_enable.n_tables;
482
483         p->msgq_req = req->pipeline_enable.msgq_req;
484         p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485         p->timer_period =
486                 (rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487         p->time_next = rte_get_tsc_cycles() + p->timer_period;
488
489         t->n_pipelines++;
490
491         /* Response */
492         rsp->status = 0;
493         return rsp;
494 }
495
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498         struct thread_msg_req *req)
499 {
500         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501         uint32_t n_pipelines = t->n_pipelines;
502         struct rte_pipeline *pipeline = req->pipeline_disable.p;
503         uint32_t i;
504
505         /* find pipeline */
506         for (i = 0; i < n_pipelines; i++) {
507                 struct pipeline_data *p = &t->pipeline_data[i];
508
509                 if (p->p != pipeline)
510                         continue;
511
512                 if (i < n_pipelines - 1) {
513                         struct rte_pipeline *pipeline_last =
514                                 t->p[n_pipelines - 1];
515                         struct pipeline_data *p_last =
516                                 &t->pipeline_data[n_pipelines - 1];
517
518                         t->p[i] = pipeline_last;
519                         memcpy(p, p_last, sizeof(*p));
520                 }
521
522                 t->n_pipelines--;
523
524                 rsp->status = 0;
525                 return rsp;
526         }
527
528         /* should not get here */
529         rsp->status = 0;
530         return rsp;
531 }
532
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536         for ( ; ; ) {
537                 struct thread_msg_req *req;
538                 struct thread_msg_rsp *rsp;
539
540                 req = thread_msg_recv(t->msgq_req);
541                 if (req == NULL)
542                         break;
543
544                 switch (req->type) {
545                 case THREAD_REQ_PIPELINE_ENABLE:
546                         rsp = thread_msg_handle_pipeline_enable(t, req);
547                         break;
548
549                 case THREAD_REQ_PIPELINE_DISABLE:
550                         rsp = thread_msg_handle_pipeline_disable(t, req);
551                         break;
552
553                 default:
554                         rsp = (struct thread_msg_rsp *) req;
555                         rsp->status = -1;
556                 }
557
558                 thread_msg_send(t->msgq_rsp, rsp);
559         }
560 }
561
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566         /* Port IN */
567         PIPELINE_REQ_PORT_IN_STATS_READ,
568         PIPELINE_REQ_PORT_IN_ENABLE,
569         PIPELINE_REQ_PORT_IN_DISABLE,
570
571         /* Port OUT */
572         PIPELINE_REQ_PORT_OUT_STATS_READ,
573
574         /* Table */
575         PIPELINE_REQ_TABLE_STATS_READ,
576         PIPELINE_REQ_TABLE_RULE_ADD,
577         PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578         PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579         PIPELINE_REQ_TABLE_RULE_DELETE,
580         PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581         PIPELINE_REQ_TABLE_RULE_STATS_READ,
582         PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583         PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584         PIPELINE_REQ_TABLE_RULE_MTR_READ,
585         PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586         PIPELINE_REQ_TABLE_RULE_TTL_READ,
587         PIPELINE_REQ_MAX
588 };
589
590 struct pipeline_msg_req_port_in_stats_read {
591         int clear;
592 };
593
594 struct pipeline_msg_req_port_out_stats_read {
595         int clear;
596 };
597
598 struct pipeline_msg_req_table_stats_read {
599         int clear;
600 };
601
602 struct pipeline_msg_req_table_rule_add {
603         struct table_rule_match match;
604         struct table_rule_action action;
605 };
606
607 struct pipeline_msg_req_table_rule_add_default {
608         struct table_rule_action action;
609 };
610
611 struct pipeline_msg_req_table_rule_add_bulk {
612         struct table_rule_list *list;
613         int bulk;
614 };
615
616 struct pipeline_msg_req_table_rule_delete {
617         struct table_rule_match match;
618 };
619
620 struct pipeline_msg_req_table_rule_stats_read {
621         void *data;
622         int clear;
623 };
624
625 struct pipeline_msg_req_table_mtr_profile_add {
626         uint32_t meter_profile_id;
627         struct rte_table_action_meter_profile profile;
628 };
629
630 struct pipeline_msg_req_table_mtr_profile_delete {
631         uint32_t meter_profile_id;
632 };
633
634 struct pipeline_msg_req_table_rule_mtr_read {
635         void *data;
636         uint32_t tc_mask;
637         int clear;
638 };
639
640 struct pipeline_msg_req_table_dscp_table_update {
641         uint64_t dscp_mask;
642         struct rte_table_action_dscp_table dscp_table;
643 };
644
645 struct pipeline_msg_req_table_rule_ttl_read {
646         void *data;
647         int clear;
648 };
649
650 struct pipeline_msg_req {
651         enum pipeline_req_type type;
652         uint32_t id; /* Port IN, port OUT or table ID */
653
654         RTE_STD_C11
655         union {
656                 struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
657                 struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
658                 struct pipeline_msg_req_table_stats_read table_stats_read;
659                 struct pipeline_msg_req_table_rule_add table_rule_add;
660                 struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
661                 struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
662                 struct pipeline_msg_req_table_rule_delete table_rule_delete;
663                 struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
664                 struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
665                 struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
666                 struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
667                 struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
668                 struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
669         };
670 };
671
672 struct pipeline_msg_rsp_port_in_stats_read {
673         struct rte_pipeline_port_in_stats stats;
674 };
675
676 struct pipeline_msg_rsp_port_out_stats_read {
677         struct rte_pipeline_port_out_stats stats;
678 };
679
680 struct pipeline_msg_rsp_table_stats_read {
681         struct rte_pipeline_table_stats stats;
682 };
683
684 struct pipeline_msg_rsp_table_rule_add {
685         void *data;
686 };
687
688 struct pipeline_msg_rsp_table_rule_add_default {
689         void *data;
690 };
691
692 struct pipeline_msg_rsp_table_rule_add_bulk {
693         uint32_t n_rules;
694 };
695
696 struct pipeline_msg_rsp_table_rule_stats_read {
697         struct rte_table_action_stats_counters stats;
698 };
699
700 struct pipeline_msg_rsp_table_rule_mtr_read {
701         struct rte_table_action_mtr_counters stats;
702 };
703
704 struct pipeline_msg_rsp_table_rule_ttl_read {
705         struct rte_table_action_ttl_counters stats;
706 };
707
708 struct pipeline_msg_rsp {
709         int status;
710
711         RTE_STD_C11
712         union {
713                 struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
714                 struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
715                 struct pipeline_msg_rsp_table_stats_read table_stats_read;
716                 struct pipeline_msg_rsp_table_rule_add table_rule_add;
717                 struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
718                 struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
719                 struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
720                 struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
721                 struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
722         };
723 };
724
725 /**
726  * Master thread
727  */
728 static struct pipeline_msg_req *
729 pipeline_msg_alloc(void)
730 {
731         size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
732                 sizeof(struct pipeline_msg_rsp));
733
734         return calloc(1, size);
735 }
736
737 static void
738 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
739 {
740         free(rsp);
741 }
742
743 static struct pipeline_msg_rsp *
744 pipeline_msg_send_recv(struct pipeline *p,
745         struct pipeline_msg_req *req)
746 {
747         struct rte_ring *msgq_req = p->msgq_req;
748         struct rte_ring *msgq_rsp = p->msgq_rsp;
749         struct pipeline_msg_rsp *rsp;
750         int status;
751
752         /* send */
753         do {
754                 status = rte_ring_sp_enqueue(msgq_req, req);
755         } while (status == -ENOBUFS);
756
757         /* recv */
758         do {
759                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
760         } while (status != 0);
761
762         return rsp;
763 }
764
765 int
766 pipeline_port_in_stats_read(const char *pipeline_name,
767         uint32_t port_id,
768         struct rte_pipeline_port_in_stats *stats,
769         int clear)
770 {
771         struct pipeline *p;
772         struct pipeline_msg_req *req;
773         struct pipeline_msg_rsp *rsp;
774         int status;
775
776         /* Check input params */
777         if ((pipeline_name == NULL) ||
778                 (stats == NULL))
779                 return -1;
780
781         p = pipeline_find(pipeline_name);
782         if ((p == NULL) ||
783                 (port_id >= p->n_ports_in))
784                 return -1;
785
786         if (!pipeline_is_running(p)) {
787                 status = rte_pipeline_port_in_stats_read(p->p,
788                         port_id,
789                         stats,
790                         clear);
791
792                 return status;
793         }
794
795         /* Allocate request */
796         req = pipeline_msg_alloc();
797         if (req == NULL)
798                 return -1;
799
800         /* Write request */
801         req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
802         req->id = port_id;
803         req->port_in_stats_read.clear = clear;
804
805         /* Send request and wait for response */
806         rsp = pipeline_msg_send_recv(p, req);
807         if (rsp == NULL)
808                 return -1;
809
810         /* Read response */
811         status = rsp->status;
812         if (status)
813                 memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
814
815         /* Free response */
816         pipeline_msg_free(rsp);
817
818         return status;
819 }
820
821 int
822 pipeline_port_in_enable(const char *pipeline_name,
823         uint32_t port_id)
824 {
825         struct pipeline *p;
826         struct pipeline_msg_req *req;
827         struct pipeline_msg_rsp *rsp;
828         int status;
829
830         /* Check input params */
831         if (pipeline_name == NULL)
832                 return -1;
833
834         p = pipeline_find(pipeline_name);
835         if ((p == NULL) ||
836                 (port_id >= p->n_ports_in))
837                 return -1;
838
839         if (!pipeline_is_running(p)) {
840                 status = rte_pipeline_port_in_enable(p->p, port_id);
841                 return status;
842         }
843
844         /* Allocate request */
845         req = pipeline_msg_alloc();
846         if (req == NULL)
847                 return -1;
848
849         /* Write request */
850         req->type = PIPELINE_REQ_PORT_IN_ENABLE;
851         req->id = port_id;
852
853         /* Send request and wait for response */
854         rsp = pipeline_msg_send_recv(p, req);
855         if (rsp == NULL)
856                 return -1;
857
858         /* Read response */
859         status = rsp->status;
860
861         /* Free response */
862         pipeline_msg_free(rsp);
863
864         return status;
865 }
866
867 int
868 pipeline_port_in_disable(const char *pipeline_name,
869         uint32_t port_id)
870 {
871         struct pipeline *p;
872         struct pipeline_msg_req *req;
873         struct pipeline_msg_rsp *rsp;
874         int status;
875
876         /* Check input params */
877         if (pipeline_name == NULL)
878                 return -1;
879
880         p = pipeline_find(pipeline_name);
881         if ((p == NULL) ||
882                 (port_id >= p->n_ports_in))
883                 return -1;
884
885         if (!pipeline_is_running(p)) {
886                 status = rte_pipeline_port_in_disable(p->p, port_id);
887                 return status;
888         }
889
890         /* Allocate request */
891         req = pipeline_msg_alloc();
892         if (req == NULL)
893                 return -1;
894
895         /* Write request */
896         req->type = PIPELINE_REQ_PORT_IN_DISABLE;
897         req->id = port_id;
898
899         /* Send request and wait for response */
900         rsp = pipeline_msg_send_recv(p, req);
901         if (rsp == NULL)
902                 return -1;
903
904         /* Read response */
905         status = rsp->status;
906
907         /* Free response */
908         pipeline_msg_free(rsp);
909
910         return status;
911 }
912
913 int
914 pipeline_port_out_stats_read(const char *pipeline_name,
915         uint32_t port_id,
916         struct rte_pipeline_port_out_stats *stats,
917         int clear)
918 {
919         struct pipeline *p;
920         struct pipeline_msg_req *req;
921         struct pipeline_msg_rsp *rsp;
922         int status;
923
924         /* Check input params */
925         if ((pipeline_name == NULL) ||
926                 (stats == NULL))
927                 return -1;
928
929         p = pipeline_find(pipeline_name);
930         if ((p == NULL) ||
931                 (port_id >= p->n_ports_out))
932                 return -1;
933
934         if (!pipeline_is_running(p)) {
935                 status = rte_pipeline_port_out_stats_read(p->p,
936                         port_id,
937                         stats,
938                         clear);
939
940                 return status;
941         }
942
943         /* Allocate request */
944         req = pipeline_msg_alloc();
945         if (req == NULL)
946                 return -1;
947
948         /* Write request */
949         req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
950         req->id = port_id;
951         req->port_out_stats_read.clear = clear;
952
953         /* Send request and wait for response */
954         rsp = pipeline_msg_send_recv(p, req);
955         if (rsp == NULL)
956                 return -1;
957
958         /* Read response */
959         status = rsp->status;
960         if (status)
961                 memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
962
963         /* Free response */
964         pipeline_msg_free(rsp);
965
966         return status;
967 }
968
969 int
970 pipeline_table_stats_read(const char *pipeline_name,
971         uint32_t table_id,
972         struct rte_pipeline_table_stats *stats,
973         int clear)
974 {
975         struct pipeline *p;
976         struct pipeline_msg_req *req;
977         struct pipeline_msg_rsp *rsp;
978         int status;
979
980         /* Check input params */
981         if ((pipeline_name == NULL) ||
982                 (stats == NULL))
983                 return -1;
984
985         p = pipeline_find(pipeline_name);
986         if ((p == NULL) ||
987                 (table_id >= p->n_tables))
988                 return -1;
989
990         if (!pipeline_is_running(p)) {
991                 status = rte_pipeline_table_stats_read(p->p,
992                         table_id,
993                         stats,
994                         clear);
995
996                 return status;
997         }
998
999         /* Allocate request */
1000         req = pipeline_msg_alloc();
1001         if (req == NULL)
1002                 return -1;
1003
1004         /* Write request */
1005         req->type = PIPELINE_REQ_TABLE_STATS_READ;
1006         req->id = table_id;
1007         req->table_stats_read.clear = clear;
1008
1009         /* Send request and wait for response */
1010         rsp = pipeline_msg_send_recv(p, req);
1011         if (rsp == NULL)
1012                 return -1;
1013
1014         /* Read response */
1015         status = rsp->status;
1016         if (status)
1017                 memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1018
1019         /* Free response */
1020         pipeline_msg_free(rsp);
1021
1022         return status;
1023 }
1024
1025 static int
1026 match_check(struct table_rule_match *match,
1027         struct pipeline *p,
1028         uint32_t table_id)
1029 {
1030         struct table *table;
1031
1032         if ((match == NULL) ||
1033                 (p == NULL) ||
1034                 (table_id >= p->n_tables))
1035                 return -1;
1036
1037         table = &p->table[table_id];
1038         if (match->match_type != table->params.match_type)
1039                 return -1;
1040
1041         switch (match->match_type) {
1042         case TABLE_ACL:
1043         {
1044                 struct table_acl_params *t = &table->params.match.acl;
1045                 struct table_rule_match_acl *r = &match->match.acl;
1046
1047                 if ((r->ip_version && (t->ip_version == 0)) ||
1048                         ((r->ip_version == 0) && t->ip_version))
1049                         return -1;
1050
1051                 if (r->ip_version) {
1052                         if ((r->sa_depth > 32) ||
1053                                 (r->da_depth > 32))
1054                                 return -1;
1055                 } else {
1056                         if ((r->sa_depth > 128) ||
1057                                 (r->da_depth > 128))
1058                                 return -1;
1059                 }
1060                 return 0;
1061         }
1062
1063         case TABLE_ARRAY:
1064                 return 0;
1065
1066         case TABLE_HASH:
1067                 return 0;
1068
1069         case TABLE_LPM:
1070         {
1071                 struct table_lpm_params *t = &table->params.match.lpm;
1072                 struct table_rule_match_lpm *r = &match->match.lpm;
1073
1074                 if ((r->ip_version && (t->key_size != 4)) ||
1075                         ((r->ip_version == 0) && (t->key_size != 16)))
1076                         return -1;
1077
1078                 if (r->ip_version) {
1079                         if (r->depth > 32)
1080                                 return -1;
1081                 } else {
1082                         if (r->depth > 128)
1083                                 return -1;
1084                 }
1085                 return 0;
1086         }
1087
1088         case TABLE_STUB:
1089                 return -1;
1090
1091         default:
1092                 return -1;
1093         }
1094 }
1095
1096 static int
1097 action_check(struct table_rule_action *action,
1098         struct pipeline *p,
1099         uint32_t table_id)
1100 {
1101         struct table_action_profile *ap;
1102
1103         if ((action == NULL) ||
1104                 (p == NULL) ||
1105                 (table_id >= p->n_tables))
1106                 return -1;
1107
1108         ap = p->table[table_id].ap;
1109         if (action->action_mask != ap->params.action_mask)
1110                 return -1;
1111
1112         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1113                 if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1114                         (action->fwd.id >= p->n_ports_out))
1115                         return -1;
1116
1117                 if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1118                         (action->fwd.id >= p->n_tables))
1119                         return -1;
1120         }
1121
1122         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1123                 uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1124                 uint32_t tc_mask1 = action->mtr.tc_mask;
1125
1126                 if (tc_mask1 != tc_mask0)
1127                         return -1;
1128         }
1129
1130         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1131                 uint32_t n_subports_per_port =
1132                         ap->params.tm.n_subports_per_port;
1133                 uint32_t n_pipes_per_subport =
1134                         ap->params.tm.n_pipes_per_subport;
1135                 uint32_t subport_id = action->tm.subport_id;
1136                 uint32_t pipe_id = action->tm.pipe_id;
1137
1138                 if ((subport_id >= n_subports_per_port) ||
1139                         (pipe_id >= n_pipes_per_subport))
1140                         return -1;
1141         }
1142
1143         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1144                 uint64_t encap_mask = ap->params.encap.encap_mask;
1145                 enum rte_table_action_encap_type type = action->encap.type;
1146
1147                 if ((encap_mask & (1LLU << type)) == 0)
1148                         return -1;
1149         }
1150
1151         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1152                 int ip_version0 = ap->params.common.ip_version;
1153                 int ip_version1 = action->nat.ip_version;
1154
1155                 if ((ip_version1 && (ip_version0 == 0)) ||
1156                         ((ip_version1 == 0) && ip_version0))
1157                         return -1;
1158         }
1159
1160         return 0;
1161 }
1162
1163 static int
1164 action_default_check(struct table_rule_action *action,
1165         struct pipeline *p,
1166         uint32_t table_id)
1167 {
1168         if ((action == NULL) ||
1169                 (action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1170                 (p == NULL) ||
1171                 (table_id >= p->n_tables))
1172                 return -1;
1173
1174         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1175                 if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1176                         (action->fwd.id >= p->n_ports_out))
1177                         return -1;
1178
1179                 if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1180                         (action->fwd.id >= p->n_tables))
1181                         return -1;
1182         }
1183
1184         return 0;
1185 }
1186
1187 union table_rule_match_low_level {
1188         struct rte_table_acl_rule_add_params acl_add;
1189         struct rte_table_acl_rule_delete_params acl_delete;
1190         struct rte_table_array_key array;
1191         uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1192         struct rte_table_lpm_key lpm_ipv4;
1193         struct rte_table_lpm_ipv6_key lpm_ipv6;
1194 };
1195
1196 static int
1197 match_convert(struct table_rule_match *mh,
1198         union table_rule_match_low_level *ml,
1199         int add);
1200
1201 static int
1202 action_convert(struct rte_table_action *a,
1203         struct table_rule_action *action,
1204         struct rte_pipeline_table_entry *data);
1205
1206 struct table_ll {
1207         struct rte_pipeline *p;
1208         int table_id;
1209         struct rte_table_action *a;
1210         int bulk_supported;
1211 };
1212
1213 static int
1214 table_rule_add_bulk_ll(struct table_ll *table,
1215         struct table_rule_list *list,
1216         uint32_t *n_rules)
1217 {
1218         union table_rule_match_low_level *match_ll = NULL;
1219         uint8_t *action_ll = NULL;
1220         void **match_ll_ptr = NULL;
1221         struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1222         struct rte_pipeline_table_entry **entries_ptr = NULL;
1223         int *found = NULL;
1224         struct table_rule *rule;
1225         uint32_t n, i;
1226         int status = 0;
1227
1228         n = 0;
1229         TAILQ_FOREACH(rule, list, node)
1230                 n++;
1231
1232         /* Memory allocation */
1233         match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1234         action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1235
1236         match_ll_ptr = calloc(n, sizeof(void *));
1237         action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1238
1239         entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1240         found = calloc(n, sizeof(int));
1241
1242         if (match_ll == NULL ||
1243                 action_ll == NULL ||
1244                 match_ll_ptr == NULL ||
1245                 action_ll_ptr == NULL ||
1246                 entries_ptr == NULL ||
1247                 found == NULL) {
1248                         status = -ENOMEM;
1249                         goto table_rule_add_bulk_ll_free;
1250         }
1251
1252         /* Init */
1253         for (i = 0; i < n; i++) {
1254                 match_ll_ptr[i] = (void *)&match_ll[i];
1255                 action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1256                         &action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1257         }
1258
1259         /* Rule (match, action) conversion */
1260         i = 0;
1261         TAILQ_FOREACH(rule, list, node) {
1262                 status = match_convert(&rule->match, match_ll_ptr[i], 1);
1263                 if (status)
1264                         goto table_rule_add_bulk_ll_free;
1265
1266                 status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1267                 if (status)
1268                         goto table_rule_add_bulk_ll_free;
1269
1270                 i++;
1271         }
1272
1273         /* Add rule (match, action) to table */
1274         if (table->bulk_supported) {
1275                 status = rte_pipeline_table_entry_add_bulk(table->p,
1276                         table->table_id,
1277                         match_ll_ptr,
1278                         action_ll_ptr,
1279                         n,
1280                         found,
1281                         entries_ptr);
1282                 if (status)
1283                         goto table_rule_add_bulk_ll_free;
1284         } else
1285                 for (i = 0; i < n; i++) {
1286                         status = rte_pipeline_table_entry_add(table->p,
1287                                 table->table_id,
1288                                 match_ll_ptr[i],
1289                                 action_ll_ptr[i],
1290                                 &found[i],
1291                                 &entries_ptr[i]);
1292                         if (status) {
1293                                 if (i == 0)
1294                                         goto table_rule_add_bulk_ll_free;
1295
1296                                 /* No roll-back. */
1297                                 status = 0;
1298                                 n = i;
1299                                 break;
1300                         }
1301                 }
1302
1303         /* Write back to the rule list. */
1304         i = 0;
1305         TAILQ_FOREACH(rule, list, node) {
1306                 if (i >= n)
1307                         break;
1308
1309                 rule->data = entries_ptr[i];
1310
1311                 i++;
1312         }
1313
1314         *n_rules = n;
1315
1316         /* Free */
1317 table_rule_add_bulk_ll_free:
1318         free(found);
1319         free(entries_ptr);
1320         free(action_ll_ptr);
1321         free(match_ll_ptr);
1322         free(action_ll);
1323         free(match_ll);
1324
1325         return status;
1326 }
1327
1328 int
1329 pipeline_table_rule_add(const char *pipeline_name,
1330         uint32_t table_id,
1331         struct table_rule_match *match,
1332         struct table_rule_action *action)
1333 {
1334         struct pipeline *p;
1335         struct table *table;
1336         struct pipeline_msg_req *req;
1337         struct pipeline_msg_rsp *rsp;
1338         struct table_rule *rule;
1339         int status;
1340
1341         /* Check input params */
1342         if ((pipeline_name == NULL) ||
1343                 (match == NULL) ||
1344                 (action == NULL))
1345                 return -1;
1346
1347         p = pipeline_find(pipeline_name);
1348         if ((p == NULL) ||
1349                 (table_id >= p->n_tables) ||
1350                 match_check(match, p, table_id) ||
1351                 action_check(action, p, table_id))
1352                 return -1;
1353
1354         table = &p->table[table_id];
1355
1356         rule = calloc(1, sizeof(struct table_rule));
1357         if (rule == NULL)
1358                 return -1;
1359
1360         memcpy(&rule->match, match, sizeof(*match));
1361         memcpy(&rule->action, action, sizeof(*action));
1362
1363         if (!pipeline_is_running(p)) {
1364                 union table_rule_match_low_level match_ll;
1365                 struct rte_pipeline_table_entry *data_in, *data_out;
1366                 int key_found;
1367                 uint8_t *buffer;
1368
1369                 buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1370                 if (buffer == NULL) {
1371                         free(rule);
1372                         return -1;
1373                 }
1374
1375                 /* Table match-action rule conversion */
1376                 data_in = (struct rte_pipeline_table_entry *)buffer;
1377
1378                 status = match_convert(match, &match_ll, 1);
1379                 if (status) {
1380                         free(buffer);
1381                         free(rule);
1382                         return -1;
1383                 }
1384
1385                 status = action_convert(table->a, action, data_in);
1386                 if (status) {
1387                         free(buffer);
1388                         free(rule);
1389                         return -1;
1390                 }
1391
1392                 /* Add rule (match, action) to table */
1393                 status = rte_pipeline_table_entry_add(p->p,
1394                                 table_id,
1395                                 &match_ll,
1396                                 data_in,
1397                                 &key_found,
1398                                 &data_out);
1399                 if (status) {
1400                         free(buffer);
1401                         free(rule);
1402                         return -1;
1403                 }
1404
1405                 /* Write Response */
1406                 rule->data = data_out;
1407                 table_rule_add(table, rule);
1408
1409                 free(buffer);
1410                 return 0;
1411         }
1412
1413         /* Allocate request */
1414         req = pipeline_msg_alloc();
1415         if (req == NULL) {
1416                 free(rule);
1417                 return -1;
1418         }
1419
1420         /* Write request */
1421         req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1422         req->id = table_id;
1423         memcpy(&req->table_rule_add.match, match, sizeof(*match));
1424         memcpy(&req->table_rule_add.action, action, sizeof(*action));
1425
1426         /* Send request and wait for response */
1427         rsp = pipeline_msg_send_recv(p, req);
1428         if (rsp == NULL) {
1429                 free(rule);
1430                 return -1;
1431         }
1432
1433         /* Read response */
1434         status = rsp->status;
1435         if (status == 0) {
1436                 rule->data = rsp->table_rule_add.data;
1437                 table_rule_add(table, rule);
1438         } else
1439                 free(rule);
1440
1441         /* Free response */
1442         pipeline_msg_free(rsp);
1443
1444         return status;
1445 }
1446
1447 int
1448 pipeline_table_rule_add_default(const char *pipeline_name,
1449         uint32_t table_id,
1450         struct table_rule_action *action)
1451 {
1452         struct pipeline *p;
1453         struct table *table;
1454         struct pipeline_msg_req *req;
1455         struct pipeline_msg_rsp *rsp;
1456         struct table_rule *rule;
1457         int status;
1458
1459         /* Check input params */
1460         if ((pipeline_name == NULL) ||
1461                 (action == NULL))
1462                 return -1;
1463
1464         p = pipeline_find(pipeline_name);
1465         if ((p == NULL) ||
1466                 (table_id >= p->n_tables) ||
1467                 action_default_check(action, p, table_id))
1468                 return -1;
1469
1470         table = &p->table[table_id];
1471
1472         rule = calloc(1, sizeof(struct table_rule));
1473         if (rule == NULL)
1474                 return -1;
1475
1476         memcpy(&rule->action, action, sizeof(*action));
1477
1478         if (!pipeline_is_running(p)) {
1479                 struct rte_pipeline_table_entry *data_in, *data_out;
1480                 uint8_t *buffer;
1481
1482                 buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1483                 if (buffer == NULL) {
1484                         free(rule);
1485                         return -1;
1486                 }
1487
1488                 /* Apply actions */
1489                 data_in = (struct rte_pipeline_table_entry *)buffer;
1490
1491                 data_in->action = action->fwd.action;
1492                 if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1493                         data_in->port_id = action->fwd.id;
1494                 if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1495                         data_in->table_id = action->fwd.id;
1496
1497                 /* Add default rule to table */
1498                 status = rte_pipeline_table_default_entry_add(p->p,
1499                                 table_id,
1500                                 data_in,
1501                                 &data_out);
1502                 if (status) {
1503                         free(buffer);
1504                         free(rule);
1505                         return -1;
1506                 }
1507
1508                 /* Write Response */
1509                 rule->data = data_out;
1510                 table_rule_default_add(table, rule);
1511
1512                 free(buffer);
1513                 return 0;
1514         }
1515
1516         /* Allocate request */
1517         req = pipeline_msg_alloc();
1518         if (req == NULL) {
1519                 free(rule);
1520                 return -1;
1521         }
1522
1523         /* Write request */
1524         req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1525         req->id = table_id;
1526         memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1527
1528         /* Send request and wait for response */
1529         rsp = pipeline_msg_send_recv(p, req);
1530         if (rsp == NULL) {
1531                 free(rule);
1532                 return -1;
1533         }
1534
1535         /* Read response */
1536         status = rsp->status;
1537         if (status == 0) {
1538                 rule->data = rsp->table_rule_add_default.data;
1539                 table_rule_default_add(table, rule);
1540         } else
1541                 free(rule);
1542
1543         /* Free response */
1544         pipeline_msg_free(rsp);
1545
1546         return status;
1547 }
1548
1549 static uint32_t
1550 table_rule_list_free(struct table_rule_list *list)
1551 {
1552         uint32_t n = 0;
1553
1554         if (!list)
1555                 return 0;
1556
1557         for ( ; ; ) {
1558                 struct table_rule *rule;
1559
1560                 rule = TAILQ_FIRST(list);
1561                 if (rule == NULL)
1562                         break;
1563
1564                 TAILQ_REMOVE(list, rule, node);
1565                 free(rule);
1566                 n++;
1567         }
1568
1569         free(list);
1570         return n;
1571 }
1572
1573 int
1574 pipeline_table_rule_add_bulk(const char *pipeline_name,
1575         uint32_t table_id,
1576         struct table_rule_list *list,
1577         uint32_t *n_rules_added,
1578         uint32_t *n_rules_not_added)
1579 {
1580         struct pipeline *p;
1581         struct table *table;
1582         struct pipeline_msg_req *req;
1583         struct pipeline_msg_rsp *rsp;
1584         struct table_rule *rule;
1585         int status = 0;
1586
1587         /* Check input params */
1588         if ((pipeline_name == NULL) ||
1589                 (list == NULL) ||
1590                 TAILQ_EMPTY(list) ||
1591                 (n_rules_added == NULL) ||
1592                 (n_rules_not_added == NULL)) {
1593                 table_rule_list_free(list);
1594                 return -EINVAL;
1595         }
1596
1597         p = pipeline_find(pipeline_name);
1598         if ((p == NULL) ||
1599                 (table_id >= p->n_tables)) {
1600                 table_rule_list_free(list);
1601                 return -EINVAL;
1602         }
1603
1604         table = &p->table[table_id];
1605
1606         TAILQ_FOREACH(rule, list, node)
1607                 if (match_check(&rule->match, p, table_id) ||
1608                         action_check(&rule->action, p, table_id)) {
1609                         table_rule_list_free(list);
1610                         return -EINVAL;
1611                 }
1612
1613         if (!pipeline_is_running(p)) {
1614                 struct table_ll table_ll = {
1615                         .p = p->p,
1616                         .table_id = table_id,
1617                         .a = table->a,
1618                         .bulk_supported = table->params.match_type == TABLE_ACL,
1619                 };
1620
1621                 status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1622                 if (status) {
1623                         table_rule_list_free(list);
1624                         return status;
1625                 }
1626
1627                 table_rule_add_bulk(table, list, *n_rules_added);
1628                 *n_rules_not_added = table_rule_list_free(list);
1629                 return 0;
1630         }
1631
1632         /* Allocate request */
1633         req = pipeline_msg_alloc();
1634         if (req == NULL) {
1635                 table_rule_list_free(list);
1636                 return -ENOMEM;
1637         }
1638
1639         /* Write request */
1640         req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1641         req->id = table_id;
1642         req->table_rule_add_bulk.list = list;
1643         req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1644
1645         /* Send request and wait for response */
1646         rsp = pipeline_msg_send_recv(p, req);
1647         if (rsp == NULL) {
1648                 table_rule_list_free(list);
1649                 return -ENOMEM;
1650         }
1651
1652         /* Read response */
1653         status = rsp->status;
1654         if (status == 0) {
1655                 *n_rules_added = rsp->table_rule_add_bulk.n_rules;
1656
1657                 table_rule_add_bulk(table, list, *n_rules_added);
1658                 *n_rules_not_added = table_rule_list_free(list);
1659         } else
1660                 table_rule_list_free(list);
1661
1662
1663         /* Free response */
1664         pipeline_msg_free(rsp);
1665
1666         return status;
1667 }
1668
1669 int
1670 pipeline_table_rule_delete(const char *pipeline_name,
1671         uint32_t table_id,
1672         struct table_rule_match *match)
1673 {
1674         struct pipeline *p;
1675         struct table *table;
1676         struct pipeline_msg_req *req;
1677         struct pipeline_msg_rsp *rsp;
1678         int status;
1679
1680         /* Check input params */
1681         if ((pipeline_name == NULL) ||
1682                 (match == NULL))
1683                 return -1;
1684
1685         p = pipeline_find(pipeline_name);
1686         if ((p == NULL) ||
1687                 (table_id >= p->n_tables) ||
1688                 match_check(match, p, table_id))
1689                 return -1;
1690
1691         table = &p->table[table_id];
1692
1693         if (!pipeline_is_running(p)) {
1694                 union table_rule_match_low_level match_ll;
1695                 int key_found;
1696
1697                 status = match_convert(match, &match_ll, 0);
1698                 if (status)
1699                         return -1;
1700
1701                 status = rte_pipeline_table_entry_delete(p->p,
1702                                 table_id,
1703                                 &match_ll,
1704                                 &key_found,
1705                                 NULL);
1706
1707                 if (status == 0)
1708                         table_rule_delete(table, match);
1709
1710                 return status;
1711         }
1712
1713         /* Allocate request */
1714         req = pipeline_msg_alloc();
1715         if (req == NULL)
1716                 return -1;
1717
1718         /* Write request */
1719         req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1720         req->id = table_id;
1721         memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1722
1723         /* Send request and wait for response */
1724         rsp = pipeline_msg_send_recv(p, req);
1725         if (rsp == NULL)
1726                 return -1;
1727
1728         /* Read response */
1729         status = rsp->status;
1730         if (status == 0)
1731                 table_rule_delete(table, match);
1732
1733         /* Free response */
1734         pipeline_msg_free(rsp);
1735
1736         return status;
1737 }
1738
1739 int
1740 pipeline_table_rule_delete_default(const char *pipeline_name,
1741         uint32_t table_id)
1742 {
1743         struct pipeline *p;
1744         struct pipeline_msg_req *req;
1745         struct pipeline_msg_rsp *rsp;
1746         int status;
1747
1748         /* Check input params */
1749         if (pipeline_name == NULL)
1750                 return -1;
1751
1752         p = pipeline_find(pipeline_name);
1753         if ((p == NULL) ||
1754                 (table_id >= p->n_tables))
1755                 return -1;
1756
1757         if (!pipeline_is_running(p)) {
1758                 status = rte_pipeline_table_default_entry_delete(p->p,
1759                         table_id,
1760                         NULL);
1761
1762                 return status;
1763         }
1764
1765         /* Allocate request */
1766         req = pipeline_msg_alloc();
1767         if (req == NULL)
1768                 return -1;
1769
1770         /* Write request */
1771         req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1772         req->id = table_id;
1773
1774         /* Send request and wait for response */
1775         rsp = pipeline_msg_send_recv(p, req);
1776         if (rsp == NULL)
1777                 return -1;
1778
1779         /* Read response */
1780         status = rsp->status;
1781
1782         /* Free response */
1783         pipeline_msg_free(rsp);
1784
1785         return status;
1786 }
1787
1788 int
1789 pipeline_table_rule_stats_read(const char *pipeline_name,
1790         uint32_t table_id,
1791         void *data,
1792         struct rte_table_action_stats_counters *stats,
1793         int clear)
1794 {
1795         struct pipeline *p;
1796         struct pipeline_msg_req *req;
1797         struct pipeline_msg_rsp *rsp;
1798         int status;
1799
1800         /* Check input params */
1801         if ((pipeline_name == NULL) ||
1802                 (data == NULL) ||
1803                 (stats == NULL))
1804                 return -1;
1805
1806         p = pipeline_find(pipeline_name);
1807         if ((p == NULL) ||
1808                 (table_id >= p->n_tables))
1809                 return -1;
1810
1811         if (!pipeline_is_running(p)) {
1812                 struct rte_table_action *a = p->table[table_id].a;
1813
1814                 status = rte_table_action_stats_read(a,
1815                         data,
1816                         stats,
1817                         clear);
1818
1819                 return status;
1820         }
1821
1822         /* Allocate request */
1823         req = pipeline_msg_alloc();
1824         if (req == NULL)
1825                 return -1;
1826
1827         /* Write request */
1828         req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1829         req->id = table_id;
1830         req->table_rule_stats_read.data = data;
1831         req->table_rule_stats_read.clear = clear;
1832
1833         /* Send request and wait for response */
1834         rsp = pipeline_msg_send_recv(p, req);
1835         if (rsp == NULL)
1836                 return -1;
1837
1838         /* Read response */
1839         status = rsp->status;
1840         if (status)
1841                 memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1842
1843         /* Free response */
1844         pipeline_msg_free(rsp);
1845
1846         return status;
1847 }
1848
1849 int
1850 pipeline_table_mtr_profile_add(const char *pipeline_name,
1851         uint32_t table_id,
1852         uint32_t meter_profile_id,
1853         struct rte_table_action_meter_profile *profile)
1854 {
1855         struct pipeline *p;
1856         struct pipeline_msg_req *req;
1857         struct pipeline_msg_rsp *rsp;
1858         int status;
1859
1860         /* Check input params */
1861         if ((pipeline_name == NULL) ||
1862                 (profile == NULL))
1863                 return -1;
1864
1865         p = pipeline_find(pipeline_name);
1866         if ((p == NULL) ||
1867                 (table_id >= p->n_tables))
1868                 return -1;
1869
1870         if (!pipeline_is_running(p)) {
1871                 struct rte_table_action *a = p->table[table_id].a;
1872
1873                 status = rte_table_action_meter_profile_add(a,
1874                         meter_profile_id,
1875                         profile);
1876
1877                 return status;
1878         }
1879
1880         /* Allocate request */
1881         req = pipeline_msg_alloc();
1882         if (req == NULL)
1883                 return -1;
1884
1885         /* Write request */
1886         req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1887         req->id = table_id;
1888         req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1889         memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1890
1891         /* Send request and wait for response */
1892         rsp = pipeline_msg_send_recv(p, req);
1893         if (rsp == NULL)
1894                 return -1;
1895
1896         /* Read response */
1897         status = rsp->status;
1898
1899         /* Free response */
1900         pipeline_msg_free(rsp);
1901
1902         return status;
1903 }
1904
1905 int
1906 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1907         uint32_t table_id,
1908         uint32_t meter_profile_id)
1909 {
1910         struct pipeline *p;
1911         struct pipeline_msg_req *req;
1912         struct pipeline_msg_rsp *rsp;
1913         int status;
1914
1915         /* Check input params */
1916         if (pipeline_name == NULL)
1917                 return -1;
1918
1919         p = pipeline_find(pipeline_name);
1920         if ((p == NULL) ||
1921                 (table_id >= p->n_tables))
1922                 return -1;
1923
1924         if (!pipeline_is_running(p)) {
1925                 struct rte_table_action *a = p->table[table_id].a;
1926
1927                 status = rte_table_action_meter_profile_delete(a,
1928                                 meter_profile_id);
1929
1930                 return status;
1931         }
1932
1933         /* Allocate request */
1934         req = pipeline_msg_alloc();
1935         if (req == NULL)
1936                 return -1;
1937
1938         /* Write request */
1939         req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1940         req->id = table_id;
1941         req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1942
1943         /* Send request and wait for response */
1944         rsp = pipeline_msg_send_recv(p, req);
1945         if (rsp == NULL)
1946                 return -1;
1947
1948         /* Read response */
1949         status = rsp->status;
1950
1951         /* Free response */
1952         pipeline_msg_free(rsp);
1953
1954         return status;
1955 }
1956
1957 int
1958 pipeline_table_rule_mtr_read(const char *pipeline_name,
1959         uint32_t table_id,
1960         void *data,
1961         uint32_t tc_mask,
1962         struct rte_table_action_mtr_counters *stats,
1963         int clear)
1964 {
1965         struct pipeline *p;
1966         struct pipeline_msg_req *req;
1967         struct pipeline_msg_rsp *rsp;
1968         int status;
1969
1970         /* Check input params */
1971         if ((pipeline_name == NULL) ||
1972                 (data == NULL) ||
1973                 (stats == NULL))
1974                 return -1;
1975
1976         p = pipeline_find(pipeline_name);
1977         if ((p == NULL) ||
1978                 (table_id >= p->n_tables))
1979                 return -1;
1980
1981         if (!pipeline_is_running(p)) {
1982                 struct rte_table_action *a = p->table[table_id].a;
1983
1984                 status = rte_table_action_meter_read(a,
1985                                 data,
1986                                 tc_mask,
1987                                 stats,
1988                                 clear);
1989
1990                 return status;
1991         }
1992
1993         /* Allocate request */
1994         req = pipeline_msg_alloc();
1995         if (req == NULL)
1996                 return -1;
1997
1998         /* Write request */
1999         req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
2000         req->id = table_id;
2001         req->table_rule_mtr_read.data = data;
2002         req->table_rule_mtr_read.tc_mask = tc_mask;
2003         req->table_rule_mtr_read.clear = clear;
2004
2005         /* Send request and wait for response */
2006         rsp = pipeline_msg_send_recv(p, req);
2007         if (rsp == NULL)
2008                 return -1;
2009
2010         /* Read response */
2011         status = rsp->status;
2012         if (status)
2013                 memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2014
2015         /* Free response */
2016         pipeline_msg_free(rsp);
2017
2018         return status;
2019 }
2020
2021 int
2022 pipeline_table_dscp_table_update(const char *pipeline_name,
2023         uint32_t table_id,
2024         uint64_t dscp_mask,
2025         struct rte_table_action_dscp_table *dscp_table)
2026 {
2027         struct pipeline *p;
2028         struct pipeline_msg_req *req;
2029         struct pipeline_msg_rsp *rsp;
2030         int status;
2031
2032         /* Check input params */
2033         if ((pipeline_name == NULL) ||
2034                 (dscp_table == NULL))
2035                 return -1;
2036
2037         p = pipeline_find(pipeline_name);
2038         if ((p == NULL) ||
2039                 (table_id >= p->n_tables))
2040                 return -1;
2041
2042         if (!pipeline_is_running(p)) {
2043                 struct rte_table_action *a = p->table[table_id].a;
2044
2045                 status = rte_table_action_dscp_table_update(a,
2046                                 dscp_mask,
2047                                 dscp_table);
2048
2049                 return status;
2050         }
2051
2052         /* Allocate request */
2053         req = pipeline_msg_alloc();
2054         if (req == NULL)
2055                 return -1;
2056
2057         /* Write request */
2058         req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2059         req->id = table_id;
2060         req->table_dscp_table_update.dscp_mask = dscp_mask;
2061         memcpy(&req->table_dscp_table_update.dscp_table,
2062                 dscp_table, sizeof(*dscp_table));
2063
2064         /* Send request and wait for response */
2065         rsp = pipeline_msg_send_recv(p, req);
2066         if (rsp == NULL)
2067                 return -1;
2068
2069         /* Read response */
2070         status = rsp->status;
2071
2072         /* Free response */
2073         pipeline_msg_free(rsp);
2074
2075         return status;
2076 }
2077
2078 int
2079 pipeline_table_rule_ttl_read(const char *pipeline_name,
2080         uint32_t table_id,
2081         void *data,
2082         struct rte_table_action_ttl_counters *stats,
2083         int clear)
2084 {
2085         struct pipeline *p;
2086         struct pipeline_msg_req *req;
2087         struct pipeline_msg_rsp *rsp;
2088         int status;
2089
2090         /* Check input params */
2091         if ((pipeline_name == NULL) ||
2092                 (data == NULL) ||
2093                 (stats == NULL))
2094                 return -1;
2095
2096         p = pipeline_find(pipeline_name);
2097         if ((p == NULL) ||
2098                 (table_id >= p->n_tables))
2099                 return -1;
2100
2101         if (!pipeline_is_running(p)) {
2102                 struct rte_table_action *a = p->table[table_id].a;
2103
2104                 status = rte_table_action_ttl_read(a,
2105                                 data,
2106                                 stats,
2107                                 clear);
2108
2109                 return status;
2110         }
2111
2112         /* Allocate request */
2113         req = pipeline_msg_alloc();
2114         if (req == NULL)
2115                 return -1;
2116
2117         /* Write request */
2118         req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2119         req->id = table_id;
2120         req->table_rule_ttl_read.data = data;
2121         req->table_rule_ttl_read.clear = clear;
2122
2123         /* Send request and wait for response */
2124         rsp = pipeline_msg_send_recv(p, req);
2125         if (rsp == NULL)
2126                 return -1;
2127
2128         /* Read response */
2129         status = rsp->status;
2130         if (status)
2131                 memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2132
2133         /* Free response */
2134         pipeline_msg_free(rsp);
2135
2136         return status;
2137 }
2138
2139 /**
2140  * Data plane threads: message handling
2141  */
2142 static inline struct pipeline_msg_req *
2143 pipeline_msg_recv(struct rte_ring *msgq_req)
2144 {
2145         struct pipeline_msg_req *req;
2146
2147         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2148
2149         if (status != 0)
2150                 return NULL;
2151
2152         return req;
2153 }
2154
2155 static inline void
2156 pipeline_msg_send(struct rte_ring *msgq_rsp,
2157         struct pipeline_msg_rsp *rsp)
2158 {
2159         int status;
2160
2161         do {
2162                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2163         } while (status == -ENOBUFS);
2164 }
2165
2166 static struct pipeline_msg_rsp *
2167 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2168         struct pipeline_msg_req *req)
2169 {
2170         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2171         uint32_t port_id = req->id;
2172         int clear = req->port_in_stats_read.clear;
2173
2174         rsp->status = rte_pipeline_port_in_stats_read(p->p,
2175                 port_id,
2176                 &rsp->port_in_stats_read.stats,
2177                 clear);
2178
2179         return rsp;
2180 }
2181
2182 static struct pipeline_msg_rsp *
2183 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2184         struct pipeline_msg_req *req)
2185 {
2186         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2187         uint32_t port_id = req->id;
2188
2189         rsp->status = rte_pipeline_port_in_enable(p->p,
2190                 port_id);
2191
2192         return rsp;
2193 }
2194
2195 static struct pipeline_msg_rsp *
2196 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2197         struct pipeline_msg_req *req)
2198 {
2199         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2200         uint32_t port_id = req->id;
2201
2202         rsp->status = rte_pipeline_port_in_disable(p->p,
2203                 port_id);
2204
2205         return rsp;
2206 }
2207
2208 static struct pipeline_msg_rsp *
2209 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2210         struct pipeline_msg_req *req)
2211 {
2212         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2213         uint32_t port_id = req->id;
2214         int clear = req->port_out_stats_read.clear;
2215
2216         rsp->status = rte_pipeline_port_out_stats_read(p->p,
2217                 port_id,
2218                 &rsp->port_out_stats_read.stats,
2219                 clear);
2220
2221         return rsp;
2222 }
2223
2224 static struct pipeline_msg_rsp *
2225 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2226         struct pipeline_msg_req *req)
2227 {
2228         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2229         uint32_t port_id = req->id;
2230         int clear = req->table_stats_read.clear;
2231
2232         rsp->status = rte_pipeline_table_stats_read(p->p,
2233                 port_id,
2234                 &rsp->table_stats_read.stats,
2235                 clear);
2236
2237         return rsp;
2238 }
2239
2240 static int
2241 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2242 {
2243         if (depth > 128)
2244                 return -1;
2245
2246         switch (depth / 32) {
2247         case 0:
2248                 depth32[0] = depth;
2249                 depth32[1] = 0;
2250                 depth32[2] = 0;
2251                 depth32[3] = 0;
2252                 return 0;
2253
2254         case 1:
2255                 depth32[0] = 32;
2256                 depth32[1] = depth - 32;
2257                 depth32[2] = 0;
2258                 depth32[3] = 0;
2259                 return 0;
2260
2261         case 2:
2262                 depth32[0] = 32;
2263                 depth32[1] = 32;
2264                 depth32[2] = depth - 64;
2265                 depth32[3] = 0;
2266                 return 0;
2267
2268         case 3:
2269                 depth32[0] = 32;
2270                 depth32[1] = 32;
2271                 depth32[2] = 32;
2272                 depth32[3] = depth - 96;
2273                 return 0;
2274
2275         case 4:
2276                 depth32[0] = 32;
2277                 depth32[1] = 32;
2278                 depth32[2] = 32;
2279                 depth32[3] = 32;
2280                 return 0;
2281
2282         default:
2283                 return -1;
2284         }
2285 }
2286
2287 static int
2288 match_convert(struct table_rule_match *mh,
2289         union table_rule_match_low_level *ml,
2290         int add)
2291 {
2292         memset(ml, 0, sizeof(*ml));
2293
2294         switch (mh->match_type) {
2295         case TABLE_ACL:
2296                 if (mh->match.acl.ip_version)
2297                         if (add) {
2298                                 ml->acl_add.field_value[0].value.u8 =
2299                                         mh->match.acl.proto;
2300                                 ml->acl_add.field_value[0].mask_range.u8 =
2301                                         mh->match.acl.proto_mask;
2302
2303                                 ml->acl_add.field_value[1].value.u32 =
2304                                         mh->match.acl.ipv4.sa;
2305                                 ml->acl_add.field_value[1].mask_range.u32 =
2306                                         mh->match.acl.sa_depth;
2307
2308                                 ml->acl_add.field_value[2].value.u32 =
2309                                         mh->match.acl.ipv4.da;
2310                                 ml->acl_add.field_value[2].mask_range.u32 =
2311                                         mh->match.acl.da_depth;
2312
2313                                 ml->acl_add.field_value[3].value.u16 =
2314                                         mh->match.acl.sp0;
2315                                 ml->acl_add.field_value[3].mask_range.u16 =
2316                                         mh->match.acl.sp1;
2317
2318                                 ml->acl_add.field_value[4].value.u16 =
2319                                         mh->match.acl.dp0;
2320                                 ml->acl_add.field_value[4].mask_range.u16 =
2321                                         mh->match.acl.dp1;
2322
2323                                 ml->acl_add.priority =
2324                                         (int32_t) mh->match.acl.priority;
2325                         } else {
2326                                 ml->acl_delete.field_value[0].value.u8 =
2327                                         mh->match.acl.proto;
2328                                 ml->acl_delete.field_value[0].mask_range.u8 =
2329                                         mh->match.acl.proto_mask;
2330
2331                                 ml->acl_delete.field_value[1].value.u32 =
2332                                         mh->match.acl.ipv4.sa;
2333                                 ml->acl_delete.field_value[1].mask_range.u32 =
2334                                         mh->match.acl.sa_depth;
2335
2336                                 ml->acl_delete.field_value[2].value.u32 =
2337                                         mh->match.acl.ipv4.da;
2338                                 ml->acl_delete.field_value[2].mask_range.u32 =
2339                                         mh->match.acl.da_depth;
2340
2341                                 ml->acl_delete.field_value[3].value.u16 =
2342                                         mh->match.acl.sp0;
2343                                 ml->acl_delete.field_value[3].mask_range.u16 =
2344                                         mh->match.acl.sp1;
2345
2346                                 ml->acl_delete.field_value[4].value.u16 =
2347                                         mh->match.acl.dp0;
2348                                 ml->acl_delete.field_value[4].mask_range.u16 =
2349                                         mh->match.acl.dp1;
2350                         }
2351                 else
2352                         if (add) {
2353                                 uint32_t *sa32 =
2354                                         (uint32_t *) mh->match.acl.ipv6.sa;
2355                                 uint32_t *da32 =
2356                                         (uint32_t *) mh->match.acl.ipv6.da;
2357                                 uint32_t sa32_depth[4], da32_depth[4];
2358                                 int status;
2359
2360                                 status = match_convert_ipv6_depth(
2361                                         mh->match.acl.sa_depth,
2362                                         sa32_depth);
2363                                 if (status)
2364                                         return status;
2365
2366                                 status = match_convert_ipv6_depth(
2367                                         mh->match.acl.da_depth,
2368                                         da32_depth);
2369                                 if (status)
2370                                         return status;
2371
2372                                 ml->acl_add.field_value[0].value.u8 =
2373                                         mh->match.acl.proto;
2374                                 ml->acl_add.field_value[0].mask_range.u8 =
2375                                         mh->match.acl.proto_mask;
2376
2377                                 ml->acl_add.field_value[1].value.u32 =
2378                                         rte_be_to_cpu_32(sa32[0]);
2379                                 ml->acl_add.field_value[1].mask_range.u32 =
2380                                         sa32_depth[0];
2381                                 ml->acl_add.field_value[2].value.u32 =
2382                                         rte_be_to_cpu_32(sa32[1]);
2383                                 ml->acl_add.field_value[2].mask_range.u32 =
2384                                         sa32_depth[1];
2385                                 ml->acl_add.field_value[3].value.u32 =
2386                                         rte_be_to_cpu_32(sa32[2]);
2387                                 ml->acl_add.field_value[3].mask_range.u32 =
2388                                         sa32_depth[2];
2389                                 ml->acl_add.field_value[4].value.u32 =
2390                                         rte_be_to_cpu_32(sa32[3]);
2391                                 ml->acl_add.field_value[4].mask_range.u32 =
2392                                         sa32_depth[3];
2393
2394                                 ml->acl_add.field_value[5].value.u32 =
2395                                         rte_be_to_cpu_32(da32[0]);
2396                                 ml->acl_add.field_value[5].mask_range.u32 =
2397                                         da32_depth[0];
2398                                 ml->acl_add.field_value[6].value.u32 =
2399                                         rte_be_to_cpu_32(da32[1]);
2400                                 ml->acl_add.field_value[6].mask_range.u32 =
2401                                         da32_depth[1];
2402                                 ml->acl_add.field_value[7].value.u32 =
2403                                         rte_be_to_cpu_32(da32[2]);
2404                                 ml->acl_add.field_value[7].mask_range.u32 =
2405                                         da32_depth[2];
2406                                 ml->acl_add.field_value[8].value.u32 =
2407                                         rte_be_to_cpu_32(da32[3]);
2408                                 ml->acl_add.field_value[8].mask_range.u32 =
2409                                         da32_depth[3];
2410
2411                                 ml->acl_add.field_value[9].value.u16 =
2412                                         mh->match.acl.sp0;
2413                                 ml->acl_add.field_value[9].mask_range.u16 =
2414                                         mh->match.acl.sp1;
2415
2416                                 ml->acl_add.field_value[10].value.u16 =
2417                                         mh->match.acl.dp0;
2418                                 ml->acl_add.field_value[10].mask_range.u16 =
2419                                         mh->match.acl.dp1;
2420
2421                                 ml->acl_add.priority =
2422                                         (int32_t) mh->match.acl.priority;
2423                         } else {
2424                                 uint32_t *sa32 =
2425                                         (uint32_t *) mh->match.acl.ipv6.sa;
2426                                 uint32_t *da32 =
2427                                         (uint32_t *) mh->match.acl.ipv6.da;
2428                                 uint32_t sa32_depth[4], da32_depth[4];
2429                                 int status;
2430
2431                                 status = match_convert_ipv6_depth(
2432                                         mh->match.acl.sa_depth,
2433                                         sa32_depth);
2434                                 if (status)
2435                                         return status;
2436
2437                                 status = match_convert_ipv6_depth(
2438                                         mh->match.acl.da_depth,
2439                                         da32_depth);
2440                                 if (status)
2441                                         return status;
2442
2443                                 ml->acl_delete.field_value[0].value.u8 =
2444                                         mh->match.acl.proto;
2445                                 ml->acl_delete.field_value[0].mask_range.u8 =
2446                                         mh->match.acl.proto_mask;
2447
2448                                 ml->acl_delete.field_value[1].value.u32 =
2449                                         rte_be_to_cpu_32(sa32[0]);
2450                                 ml->acl_delete.field_value[1].mask_range.u32 =
2451                                         sa32_depth[0];
2452                                 ml->acl_delete.field_value[2].value.u32 =
2453                                         rte_be_to_cpu_32(sa32[1]);
2454                                 ml->acl_delete.field_value[2].mask_range.u32 =
2455                                         sa32_depth[1];
2456                                 ml->acl_delete.field_value[3].value.u32 =
2457                                         rte_be_to_cpu_32(sa32[2]);
2458                                 ml->acl_delete.field_value[3].mask_range.u32 =
2459                                         sa32_depth[2];
2460                                 ml->acl_delete.field_value[4].value.u32 =
2461                                         rte_be_to_cpu_32(sa32[3]);
2462                                 ml->acl_delete.field_value[4].mask_range.u32 =
2463                                         sa32_depth[3];
2464
2465                                 ml->acl_delete.field_value[5].value.u32 =
2466                                         rte_be_to_cpu_32(da32[0]);
2467                                 ml->acl_delete.field_value[5].mask_range.u32 =
2468                                         da32_depth[0];
2469                                 ml->acl_delete.field_value[6].value.u32 =
2470                                         rte_be_to_cpu_32(da32[1]);
2471                                 ml->acl_delete.field_value[6].mask_range.u32 =
2472                                         da32_depth[1];
2473                                 ml->acl_delete.field_value[7].value.u32 =
2474                                         rte_be_to_cpu_32(da32[2]);
2475                                 ml->acl_delete.field_value[7].mask_range.u32 =
2476                                         da32_depth[2];
2477                                 ml->acl_delete.field_value[8].value.u32 =
2478                                         rte_be_to_cpu_32(da32[3]);
2479                                 ml->acl_delete.field_value[8].mask_range.u32 =
2480                                         da32_depth[3];
2481
2482                                 ml->acl_delete.field_value[9].value.u16 =
2483                                         mh->match.acl.sp0;
2484                                 ml->acl_delete.field_value[9].mask_range.u16 =
2485                                         mh->match.acl.sp1;
2486
2487                                 ml->acl_delete.field_value[10].value.u16 =
2488                                         mh->match.acl.dp0;
2489                                 ml->acl_delete.field_value[10].mask_range.u16 =
2490                                         mh->match.acl.dp1;
2491                         }
2492                 return 0;
2493
2494         case TABLE_ARRAY:
2495                 ml->array.pos = mh->match.array.pos;
2496                 return 0;
2497
2498         case TABLE_HASH:
2499                 memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2500                 return 0;
2501
2502         case TABLE_LPM:
2503                 if (mh->match.lpm.ip_version) {
2504                         ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2505                         ml->lpm_ipv4.depth = mh->match.lpm.depth;
2506                 } else {
2507                         memcpy(ml->lpm_ipv6.ip,
2508                                 mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2509                         ml->lpm_ipv6.depth = mh->match.lpm.depth;
2510                 }
2511
2512                 return 0;
2513
2514         default:
2515                 return -1;
2516         }
2517 }
2518
2519 static int
2520 action_convert(struct rte_table_action *a,
2521         struct table_rule_action *action,
2522         struct rte_pipeline_table_entry *data)
2523 {
2524         int status;
2525
2526         /* Apply actions */
2527         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2528                 status = rte_table_action_apply(a,
2529                         data,
2530                         RTE_TABLE_ACTION_FWD,
2531                         &action->fwd);
2532
2533                 if (status)
2534                         return status;
2535         }
2536
2537         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2538                 status = rte_table_action_apply(a,
2539                         data,
2540                         RTE_TABLE_ACTION_LB,
2541                         &action->lb);
2542
2543                 if (status)
2544                         return status;
2545         }
2546
2547         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2548                 status = rte_table_action_apply(a,
2549                         data,
2550                         RTE_TABLE_ACTION_MTR,
2551                         &action->mtr);
2552
2553                 if (status)
2554                         return status;
2555         }
2556
2557         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2558                 status = rte_table_action_apply(a,
2559                         data,
2560                         RTE_TABLE_ACTION_TM,
2561                         &action->tm);
2562
2563                 if (status)
2564                         return status;
2565         }
2566
2567         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2568                 status = rte_table_action_apply(a,
2569                         data,
2570                         RTE_TABLE_ACTION_ENCAP,
2571                         &action->encap);
2572
2573                 if (status)
2574                         return status;
2575         }
2576
2577         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2578                 status = rte_table_action_apply(a,
2579                         data,
2580                         RTE_TABLE_ACTION_NAT,
2581                         &action->nat);
2582
2583                 if (status)
2584                         return status;
2585         }
2586
2587         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2588                 status = rte_table_action_apply(a,
2589                         data,
2590                         RTE_TABLE_ACTION_TTL,
2591                         &action->ttl);
2592
2593                 if (status)
2594                         return status;
2595         }
2596
2597         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2598                 status = rte_table_action_apply(a,
2599                         data,
2600                         RTE_TABLE_ACTION_STATS,
2601                         &action->stats);
2602
2603                 if (status)
2604                         return status;
2605         }
2606
2607         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2608                 status = rte_table_action_apply(a,
2609                         data,
2610                         RTE_TABLE_ACTION_TIME,
2611                         &action->time);
2612
2613                 if (status)
2614                         return status;
2615         }
2616
2617         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2618                 status = rte_table_action_apply(a,
2619                         data,
2620                         RTE_TABLE_ACTION_SYM_CRYPTO,
2621                         &action->sym_crypto);
2622
2623                 if (status)
2624                         return status;
2625         }
2626
2627         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2628                 status = rte_table_action_apply(a,
2629                         data,
2630                         RTE_TABLE_ACTION_TAG,
2631                         &action->tag);
2632
2633                 if (status)
2634                         return status;
2635         }
2636
2637         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2638                 status = rte_table_action_apply(a,
2639                         data,
2640                         RTE_TABLE_ACTION_DECAP,
2641                         &action->decap);
2642
2643                 if (status)
2644                         return status;
2645         }
2646
2647         return 0;
2648 }
2649
2650 static struct pipeline_msg_rsp *
2651 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2652         struct pipeline_msg_req *req)
2653 {
2654         union table_rule_match_low_level match_ll;
2655         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2656         struct table_rule_match *match = &req->table_rule_add.match;
2657         struct table_rule_action *action = &req->table_rule_add.action;
2658         struct rte_pipeline_table_entry *data_in, *data_out;
2659         uint32_t table_id = req->id;
2660         int key_found, status;
2661         struct rte_table_action *a = p->table_data[table_id].a;
2662
2663         /* Apply actions */
2664         memset(p->buffer, 0, sizeof(p->buffer));
2665         data_in = (struct rte_pipeline_table_entry *) p->buffer;
2666
2667         status = match_convert(match, &match_ll, 1);
2668         if (status) {
2669                 rsp->status = -1;
2670                 return rsp;
2671         }
2672
2673         status = action_convert(a, action, data_in);
2674         if (status) {
2675                 rsp->status = -1;
2676                 return rsp;
2677         }
2678
2679         status = rte_pipeline_table_entry_add(p->p,
2680                 table_id,
2681                 &match_ll,
2682                 data_in,
2683                 &key_found,
2684                 &data_out);
2685         if (status) {
2686                 rsp->status = -1;
2687                 return rsp;
2688         }
2689
2690         /* Write response */
2691         rsp->status = 0;
2692         rsp->table_rule_add.data = data_out;
2693
2694         return rsp;
2695 }
2696
2697 static struct pipeline_msg_rsp *
2698 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2699         struct pipeline_msg_req *req)
2700 {
2701         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2702         struct table_rule_action *action = &req->table_rule_add_default.action;
2703         struct rte_pipeline_table_entry *data_in, *data_out;
2704         uint32_t table_id = req->id;
2705         int status;
2706
2707         /* Apply actions */
2708         memset(p->buffer, 0, sizeof(p->buffer));
2709         data_in = (struct rte_pipeline_table_entry *) p->buffer;
2710
2711         data_in->action = action->fwd.action;
2712         if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2713                 data_in->port_id = action->fwd.id;
2714         if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2715                 data_in->table_id = action->fwd.id;
2716
2717         /* Add default rule to table */
2718         status = rte_pipeline_table_default_entry_add(p->p,
2719                 table_id,
2720                 data_in,
2721                 &data_out);
2722         if (status) {
2723                 rsp->status = -1;
2724                 return rsp;
2725         }
2726
2727         /* Write response */
2728         rsp->status = 0;
2729         rsp->table_rule_add_default.data = data_out;
2730
2731         return rsp;
2732 }
2733
2734 static struct pipeline_msg_rsp *
2735 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2736         struct pipeline_msg_req *req)
2737 {
2738         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2739
2740         uint32_t table_id = req->id;
2741         struct table_rule_list *list = req->table_rule_add_bulk.list;
2742         uint32_t bulk = req->table_rule_add_bulk.bulk;
2743
2744         uint32_t n_rules_added;
2745         int status;
2746
2747         struct table_ll table_ll = {
2748                 .p = p->p,
2749                 .table_id = table_id,
2750                 .a = p->table_data[table_id].a,
2751                 .bulk_supported = bulk,
2752         };
2753
2754         status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2755         if (status) {
2756                 rsp->status = -1;
2757                 rsp->table_rule_add_bulk.n_rules = 0;
2758                 return rsp;
2759         }
2760
2761         /* Write response */
2762         rsp->status = 0;
2763         rsp->table_rule_add_bulk.n_rules = n_rules_added;
2764         return rsp;
2765 }
2766
2767 static struct pipeline_msg_rsp *
2768 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2769         struct pipeline_msg_req *req)
2770 {
2771         union table_rule_match_low_level match_ll;
2772         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2773         struct table_rule_match *match = &req->table_rule_delete.match;
2774         uint32_t table_id = req->id;
2775         int key_found, status;
2776
2777         status = match_convert(match, &match_ll, 0);
2778         if (status) {
2779                 rsp->status = -1;
2780                 return rsp;
2781         }
2782
2783         rsp->status = rte_pipeline_table_entry_delete(p->p,
2784                 table_id,
2785                 &match_ll,
2786                 &key_found,
2787                 NULL);
2788
2789         return rsp;
2790 }
2791
2792 static struct pipeline_msg_rsp *
2793 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2794         struct pipeline_msg_req *req)
2795 {
2796         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2797         uint32_t table_id = req->id;
2798
2799         rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2800                 table_id,
2801                 NULL);
2802
2803         return rsp;
2804 }
2805
2806 static struct pipeline_msg_rsp *
2807 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2808         struct pipeline_msg_req *req)
2809 {
2810         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2811         uint32_t table_id = req->id;
2812         void *data = req->table_rule_stats_read.data;
2813         int clear = req->table_rule_stats_read.clear;
2814         struct rte_table_action *a = p->table_data[table_id].a;
2815
2816         rsp->status = rte_table_action_stats_read(a,
2817                 data,
2818                 &rsp->table_rule_stats_read.stats,
2819                 clear);
2820
2821         return rsp;
2822 }
2823
2824 static struct pipeline_msg_rsp *
2825 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2826         struct pipeline_msg_req *req)
2827 {
2828         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2829         uint32_t table_id = req->id;
2830         uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2831         struct rte_table_action_meter_profile *profile =
2832                 &req->table_mtr_profile_add.profile;
2833         struct rte_table_action *a = p->table_data[table_id].a;
2834
2835         rsp->status = rte_table_action_meter_profile_add(a,
2836                 meter_profile_id,
2837                 profile);
2838
2839         return rsp;
2840 }
2841
2842 static struct pipeline_msg_rsp *
2843 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2844         struct pipeline_msg_req *req)
2845 {
2846         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2847         uint32_t table_id = req->id;
2848         uint32_t meter_profile_id =
2849                 req->table_mtr_profile_delete.meter_profile_id;
2850         struct rte_table_action *a = p->table_data[table_id].a;
2851
2852         rsp->status = rte_table_action_meter_profile_delete(a,
2853                 meter_profile_id);
2854
2855         return rsp;
2856 }
2857
2858 static struct pipeline_msg_rsp *
2859 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2860         struct pipeline_msg_req *req)
2861 {
2862         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2863         uint32_t table_id = req->id;
2864         void *data = req->table_rule_mtr_read.data;
2865         uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2866         int clear = req->table_rule_mtr_read.clear;
2867         struct rte_table_action *a = p->table_data[table_id].a;
2868
2869         rsp->status = rte_table_action_meter_read(a,
2870                 data,
2871                 tc_mask,
2872                 &rsp->table_rule_mtr_read.stats,
2873                 clear);
2874
2875         return rsp;
2876 }
2877
2878 static struct pipeline_msg_rsp *
2879 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2880         struct pipeline_msg_req *req)
2881 {
2882         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2883         uint32_t table_id = req->id;
2884         uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2885         struct rte_table_action_dscp_table *dscp_table =
2886                 &req->table_dscp_table_update.dscp_table;
2887         struct rte_table_action *a = p->table_data[table_id].a;
2888
2889         rsp->status = rte_table_action_dscp_table_update(a,
2890                 dscp_mask,
2891                 dscp_table);
2892
2893         return rsp;
2894 }
2895
2896 static struct pipeline_msg_rsp *
2897 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2898         struct pipeline_msg_req *req)
2899 {
2900         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2901         uint32_t table_id = req->id;
2902         void *data = req->table_rule_ttl_read.data;
2903         int clear = req->table_rule_ttl_read.clear;
2904         struct rte_table_action *a = p->table_data[table_id].a;
2905
2906         rsp->status = rte_table_action_ttl_read(a,
2907                 data,
2908                 &rsp->table_rule_ttl_read.stats,
2909                 clear);
2910
2911         return rsp;
2912 }
2913
2914 static void
2915 pipeline_msg_handle(struct pipeline_data *p)
2916 {
2917         for ( ; ; ) {
2918                 struct pipeline_msg_req *req;
2919                 struct pipeline_msg_rsp *rsp;
2920
2921                 req = pipeline_msg_recv(p->msgq_req);
2922                 if (req == NULL)
2923                         break;
2924
2925                 switch (req->type) {
2926                 case PIPELINE_REQ_PORT_IN_STATS_READ:
2927                         rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2928                         break;
2929
2930                 case PIPELINE_REQ_PORT_IN_ENABLE:
2931                         rsp = pipeline_msg_handle_port_in_enable(p, req);
2932                         break;
2933
2934                 case PIPELINE_REQ_PORT_IN_DISABLE:
2935                         rsp = pipeline_msg_handle_port_in_disable(p, req);
2936                         break;
2937
2938                 case PIPELINE_REQ_PORT_OUT_STATS_READ:
2939                         rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2940                         break;
2941
2942                 case PIPELINE_REQ_TABLE_STATS_READ:
2943                         rsp = pipeline_msg_handle_table_stats_read(p, req);
2944                         break;
2945
2946                 case PIPELINE_REQ_TABLE_RULE_ADD:
2947                         rsp = pipeline_msg_handle_table_rule_add(p, req);
2948                         break;
2949
2950                 case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2951                         rsp = pipeline_msg_handle_table_rule_add_default(p,     req);
2952                         break;
2953
2954                 case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2955                         rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2956                         break;
2957
2958                 case PIPELINE_REQ_TABLE_RULE_DELETE:
2959                         rsp = pipeline_msg_handle_table_rule_delete(p, req);
2960                         break;
2961
2962                 case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2963                         rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2964                         break;
2965
2966                 case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2967                         rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2968                         break;
2969
2970                 case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2971                         rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2972                         break;
2973
2974                 case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2975                         rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2976                         break;
2977
2978                 case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2979                         rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2980                         break;
2981
2982                 case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2983                         rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2984                         break;
2985
2986                 case PIPELINE_REQ_TABLE_RULE_TTL_READ:
2987                         rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
2988                         break;
2989
2990                 default:
2991                         rsp = (struct pipeline_msg_rsp *) req;
2992                         rsp->status = -1;
2993                 }
2994
2995                 pipeline_msg_send(p->msgq_rsp, rsp);
2996         }
2997 }
2998
2999 /**
3000  * Data plane threads: main
3001  */
3002 int
3003 thread_main(void *arg __rte_unused)
3004 {
3005         struct thread_data *t;
3006         uint32_t thread_id, i;
3007
3008         thread_id = rte_lcore_id();
3009         t = &thread_data[thread_id];
3010
3011         /* Dispatch loop */
3012         for (i = 0; ; i++) {
3013                 uint32_t j;
3014
3015                 /* Data Plane */
3016                 for (j = 0; j < t->n_pipelines; j++)
3017                         rte_pipeline_run(t->p[j]);
3018
3019                 /* Control Plane */
3020                 if ((i & 0xF) == 0) {
3021                         uint64_t time = rte_get_tsc_cycles();
3022                         uint64_t time_next_min = UINT64_MAX;
3023
3024                         if (time < t->time_next_min)
3025                                 continue;
3026
3027                         /* Pipeline message queues */
3028                         for (j = 0; j < t->n_pipelines; j++) {
3029                                 struct pipeline_data *p =
3030                                         &t->pipeline_data[j];
3031                                 uint64_t time_next = p->time_next;
3032
3033                                 if (time_next <= time) {
3034                                         pipeline_msg_handle(p);
3035                                         rte_pipeline_flush(p->p);
3036                                         time_next = time + p->timer_period;
3037                                         p->time_next = time_next;
3038                                 }
3039
3040                                 if (time_next < time_next_min)
3041                                         time_next_min = time_next;
3042                         }
3043
3044                         /* Thread message queues */
3045                         {
3046                                 uint64_t time_next = t->time_next;
3047
3048                                 if (time_next <= time) {
3049                                         thread_msg_handle(t);
3050                                         time_next = time + t->timer_period;
3051                                         t->time_next = time_next;
3052                                 }
3053
3054                                 if (time_next < time_next_min)
3055                                         time_next_min = time_next;
3056                         }
3057
3058                         t->time_next_min = time_next_min;
3059                 }
3060         }
3061
3062         return 0;
3063 }