5019de97ea7517e445cee83e29fe8686aca093b3
[dpdk.git] / examples / ip_pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38         struct rte_ring *msgq_req;
39         struct rte_ring *msgq_rsp;
40
41         uint32_t enabled;
42 };
43
44 static struct thread thread[RTE_MAX_LCORE];
45
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50         struct rte_table_action *a;
51 };
52
53 struct pipeline_data {
54         struct rte_pipeline *p;
55         struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56         uint32_t n_tables;
57
58         struct rte_ring *msgq_req;
59         struct rte_ring *msgq_rsp;
60         uint64_t timer_period; /* Measured in CPU cycles. */
61         uint64_t time_next;
62
63         uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65
66 struct thread_data {
67         struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68         uint32_t n_pipelines;
69
70         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71         struct rte_ring *msgq_req;
72         struct rte_ring *msgq_rsp;
73         uint64_t timer_period; /* Measured in CPU cycles. */
74         uint64_t time_next;
75         uint64_t time_next_min;
76 } __rte_cache_aligned;
77
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86         uint32_t i;
87
88         for (i = 0; i < RTE_MAX_LCORE; i++) {
89                 struct thread *t = &thread[i];
90
91                 if (!rte_lcore_is_enabled(i))
92                         continue;
93
94                 /* MSGQs */
95                 if (t->msgq_req)
96                         rte_ring_free(t->msgq_req);
97
98                 if (t->msgq_rsp)
99                         rte_ring_free(t->msgq_rsp);
100         }
101 }
102
103 int
104 thread_init(void)
105 {
106         uint32_t i;
107
108         RTE_LCORE_FOREACH_SLAVE(i) {
109                 char name[NAME_MAX];
110                 struct rte_ring *msgq_req, *msgq_rsp;
111                 struct thread *t = &thread[i];
112                 struct thread_data *t_data = &thread_data[i];
113                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
114
115                 /* MSGQs */
116                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117
118                 msgq_req = rte_ring_create(name,
119                         THREAD_MSGQ_SIZE,
120                         cpu_id,
121                         RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123                 if (msgq_req == NULL) {
124                         thread_free();
125                         return -1;
126                 }
127
128                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129
130                 msgq_rsp = rte_ring_create(name,
131                         THREAD_MSGQ_SIZE,
132                         cpu_id,
133                         RING_F_SP_ENQ | RING_F_SC_DEQ);
134
135                 if (msgq_rsp == NULL) {
136                         thread_free();
137                         return -1;
138                 }
139
140                 /* Master thread records */
141                 t->msgq_req = msgq_req;
142                 t->msgq_rsp = msgq_rsp;
143                 t->enabled = 1;
144
145                 /* Data plane thread records */
146                 t_data->n_pipelines = 0;
147                 t_data->msgq_req = msgq_req;
148                 t_data->msgq_rsp = msgq_rsp;
149                 t_data->timer_period =
150                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152                 t_data->time_next_min = t_data->time_next;
153         }
154
155         return 0;
156 }
157
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161         enum rte_lcore_state_t thread_state;
162
163         thread_state = rte_eal_get_lcore_state(thread_id);
164         return (thread_state == RUNNING) ? 1 : 0;
165 }
166
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175         if (p->enabled == 0)
176                 return 0;
177
178         return thread_is_running(p->thread_id);
179 }
180
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185         THREAD_REQ_PIPELINE_ENABLE = 0,
186         THREAD_REQ_PIPELINE_DISABLE,
187         THREAD_REQ_MAX
188 };
189
190 struct thread_msg_req {
191         enum thread_req_type type;
192
193         union {
194                 struct {
195                         struct rte_pipeline *p;
196                         struct {
197                                 struct rte_table_action *a;
198                         } table[RTE_PIPELINE_TABLE_MAX];
199                         struct rte_ring *msgq_req;
200                         struct rte_ring *msgq_rsp;
201                         uint32_t timer_period_ms;
202                         uint32_t n_tables;
203                 } pipeline_enable;
204
205                 struct {
206                         struct rte_pipeline *p;
207                 } pipeline_disable;
208         };
209 };
210
211 struct thread_msg_rsp {
212         int status;
213 };
214
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222                 sizeof(struct thread_msg_rsp));
223
224         return calloc(1, size);
225 }
226
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230         free(rsp);
231 }
232
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235         struct thread_msg_req *req)
236 {
237         struct thread *t = &thread[thread_id];
238         struct rte_ring *msgq_req = t->msgq_req;
239         struct rte_ring *msgq_rsp = t->msgq_rsp;
240         struct thread_msg_rsp *rsp;
241         int status;
242
243         /* send */
244         do {
245                 status = rte_ring_sp_enqueue(msgq_req, req);
246         } while (status == -ENOBUFS);
247
248         /* recv */
249         do {
250                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251         } while (status != 0);
252
253         return rsp;
254 }
255
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258         const char *pipeline_name)
259 {
260         struct pipeline *p = pipeline_find(pipeline_name);
261         struct thread *t;
262         struct thread_msg_req *req;
263         struct thread_msg_rsp *rsp;
264         uint32_t i;
265         int status;
266
267         /* Check input params */
268         if ((thread_id >= RTE_MAX_LCORE) ||
269                 (p == NULL) ||
270                 (p->n_ports_in == 0) ||
271                 (p->n_ports_out == 0) ||
272                 (p->n_tables == 0))
273                 return -1;
274
275         t = &thread[thread_id];
276         if ((t->enabled == 0) ||
277                 p->enabled)
278                 return -1;
279
280         if (!thread_is_running(thread_id)) {
281                 struct thread_data *td = &thread_data[thread_id];
282                 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283
284                 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285                         return -1;
286
287                 /* Data plane thread */
288                 td->p[td->n_pipelines] = p->p;
289
290                 tdp->p = p->p;
291                 for (i = 0; i < p->n_tables; i++)
292                         tdp->table_data[i].a = p->table[i].a;
293
294                 tdp->n_tables = p->n_tables;
295
296                 tdp->msgq_req = p->msgq_req;
297                 tdp->msgq_rsp = p->msgq_rsp;
298                 tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299                 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300
301                 td->n_pipelines++;
302
303                 /* Pipeline */
304                 p->thread_id = thread_id;
305                 p->enabled = 1;
306
307                 return 0;
308         }
309
310         /* Allocate request */
311         req = thread_msg_alloc();
312         if (req == NULL)
313                 return -1;
314
315         /* Write request */
316         req->type = THREAD_REQ_PIPELINE_ENABLE;
317         req->pipeline_enable.p = p->p;
318         for (i = 0; i < p->n_tables; i++)
319                 req->pipeline_enable.table[i].a =
320                         p->table[i].a;
321         req->pipeline_enable.msgq_req = p->msgq_req;
322         req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324         req->pipeline_enable.n_tables = p->n_tables;
325
326         /* Send request and wait for response */
327         rsp = thread_msg_send_recv(thread_id, req);
328         if (rsp == NULL)
329                 return -1;
330
331         /* Read response */
332         status = rsp->status;
333
334         /* Free response */
335         thread_msg_free(rsp);
336
337         /* Request completion */
338         if (status)
339                 return status;
340
341         p->thread_id = thread_id;
342         p->enabled = 1;
343
344         return 0;
345 }
346
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349         const char *pipeline_name)
350 {
351         struct pipeline *p = pipeline_find(pipeline_name);
352         struct thread *t;
353         struct thread_msg_req *req;
354         struct thread_msg_rsp *rsp;
355         int status;
356
357         /* Check input params */
358         if ((thread_id >= RTE_MAX_LCORE) ||
359                 (p == NULL))
360                 return -1;
361
362         t = &thread[thread_id];
363         if (t->enabled == 0)
364                 return -1;
365
366         if (p->enabled == 0)
367                 return 0;
368
369         if (p->thread_id != thread_id)
370                 return -1;
371
372         if (!thread_is_running(thread_id)) {
373                 struct thread_data *td = &thread_data[thread_id];
374                 uint32_t i;
375
376                 for (i = 0; i < td->n_pipelines; i++) {
377                         struct pipeline_data *tdp = &td->pipeline_data[i];
378
379                         if (tdp->p != p->p)
380                                 continue;
381
382                         /* Data plane thread */
383                         if (i < td->n_pipelines - 1) {
384                                 struct rte_pipeline *pipeline_last =
385                                         td->p[td->n_pipelines - 1];
386                                 struct pipeline_data *tdp_last =
387                                         &td->pipeline_data[td->n_pipelines - 1];
388
389                                 td->p[i] = pipeline_last;
390                                 memcpy(tdp, tdp_last, sizeof(*tdp));
391                         }
392
393                         td->n_pipelines--;
394
395                         /* Pipeline */
396                         p->enabled = 0;
397
398                         break;
399                 }
400
401                 return 0;
402         }
403
404         /* Allocate request */
405         req = thread_msg_alloc();
406         if (req == NULL)
407                 return -1;
408
409         /* Write request */
410         req->type = THREAD_REQ_PIPELINE_DISABLE;
411         req->pipeline_disable.p = p->p;
412
413         /* Send request and wait for response */
414         rsp = thread_msg_send_recv(thread_id, req);
415         if (rsp == NULL)
416                 return -1;
417
418         /* Read response */
419         status = rsp->status;
420
421         /* Free response */
422         thread_msg_free(rsp);
423
424         /* Request completion */
425         if (status)
426                 return status;
427
428         p->enabled = 0;
429
430         return 0;
431 }
432
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439         struct thread_msg_req *req;
440
441         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442
443         if (status != 0)
444                 return NULL;
445
446         return req;
447 }
448
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451         struct thread_msg_rsp *rsp)
452 {
453         int status;
454
455         do {
456                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457         } while (status == -ENOBUFS);
458 }
459
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462         struct thread_msg_req *req)
463 {
464         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466         uint32_t i;
467
468         /* Request */
469         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470                 rsp->status = -1;
471                 return rsp;
472         }
473
474         t->p[t->n_pipelines] = req->pipeline_enable.p;
475
476         p->p = req->pipeline_enable.p;
477         for (i = 0; i < req->pipeline_enable.n_tables; i++)
478                 p->table_data[i].a =
479                         req->pipeline_enable.table[i].a;
480
481         p->n_tables = req->pipeline_enable.n_tables;
482
483         p->msgq_req = req->pipeline_enable.msgq_req;
484         p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485         p->timer_period =
486                 (rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487         p->time_next = rte_get_tsc_cycles() + p->timer_period;
488
489         t->n_pipelines++;
490
491         /* Response */
492         rsp->status = 0;
493         return rsp;
494 }
495
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498         struct thread_msg_req *req)
499 {
500         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501         uint32_t n_pipelines = t->n_pipelines;
502         struct rte_pipeline *pipeline = req->pipeline_disable.p;
503         uint32_t i;
504
505         /* find pipeline */
506         for (i = 0; i < n_pipelines; i++) {
507                 struct pipeline_data *p = &t->pipeline_data[i];
508
509                 if (p->p != pipeline)
510                         continue;
511
512                 if (i < n_pipelines - 1) {
513                         struct rte_pipeline *pipeline_last =
514                                 t->p[n_pipelines - 1];
515                         struct pipeline_data *p_last =
516                                 &t->pipeline_data[n_pipelines - 1];
517
518                         t->p[i] = pipeline_last;
519                         memcpy(p, p_last, sizeof(*p));
520                 }
521
522                 t->n_pipelines--;
523
524                 rsp->status = 0;
525                 return rsp;
526         }
527
528         /* should not get here */
529         rsp->status = 0;
530         return rsp;
531 }
532
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536         for ( ; ; ) {
537                 struct thread_msg_req *req;
538                 struct thread_msg_rsp *rsp;
539
540                 req = thread_msg_recv(t->msgq_req);
541                 if (req == NULL)
542                         break;
543
544                 switch (req->type) {
545                 case THREAD_REQ_PIPELINE_ENABLE:
546                         rsp = thread_msg_handle_pipeline_enable(t, req);
547                         break;
548
549                 case THREAD_REQ_PIPELINE_DISABLE:
550                         rsp = thread_msg_handle_pipeline_disable(t, req);
551                         break;
552
553                 default:
554                         rsp = (struct thread_msg_rsp *) req;
555                         rsp->status = -1;
556                 }
557
558                 thread_msg_send(t->msgq_rsp, rsp);
559         }
560 }
561
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566         /* Port IN */
567         PIPELINE_REQ_PORT_IN_STATS_READ,
568         PIPELINE_REQ_PORT_IN_ENABLE,
569         PIPELINE_REQ_PORT_IN_DISABLE,
570
571         /* Port OUT */
572         PIPELINE_REQ_PORT_OUT_STATS_READ,
573
574         /* Table */
575         PIPELINE_REQ_TABLE_STATS_READ,
576         PIPELINE_REQ_TABLE_RULE_ADD,
577         PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578         PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579         PIPELINE_REQ_TABLE_RULE_DELETE,
580         PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581         PIPELINE_REQ_TABLE_RULE_STATS_READ,
582         PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583         PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584         PIPELINE_REQ_TABLE_RULE_MTR_READ,
585         PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586         PIPELINE_REQ_TABLE_RULE_TTL_READ,
587         PIPELINE_REQ_MAX
588 };
589
590 struct pipeline_msg_req_port_in_stats_read {
591         int clear;
592 };
593
594 struct pipeline_msg_req_port_out_stats_read {
595         int clear;
596 };
597
598 struct pipeline_msg_req_table_stats_read {
599         int clear;
600 };
601
602 struct pipeline_msg_req_table_rule_add {
603         struct table_rule_match match;
604         struct table_rule_action action;
605 };
606
607 struct pipeline_msg_req_table_rule_add_default {
608         struct table_rule_action action;
609 };
610
611 struct pipeline_msg_req_table_rule_add_bulk {
612         struct table_rule_match *match;
613         struct table_rule_action *action;
614         void **data;
615         uint32_t n_rules;
616         int bulk;
617 };
618
619 struct pipeline_msg_req_table_rule_delete {
620         struct table_rule_match match;
621 };
622
623 struct pipeline_msg_req_table_rule_stats_read {
624         void *data;
625         int clear;
626 };
627
628 struct pipeline_msg_req_table_mtr_profile_add {
629         uint32_t meter_profile_id;
630         struct rte_table_action_meter_profile profile;
631 };
632
633 struct pipeline_msg_req_table_mtr_profile_delete {
634         uint32_t meter_profile_id;
635 };
636
637 struct pipeline_msg_req_table_rule_mtr_read {
638         void *data;
639         uint32_t tc_mask;
640         int clear;
641 };
642
643 struct pipeline_msg_req_table_dscp_table_update {
644         uint64_t dscp_mask;
645         struct rte_table_action_dscp_table dscp_table;
646 };
647
648 struct pipeline_msg_req_table_rule_ttl_read {
649         void *data;
650         int clear;
651 };
652
653 struct pipeline_msg_req {
654         enum pipeline_req_type type;
655         uint32_t id; /* Port IN, port OUT or table ID */
656
657         RTE_STD_C11
658         union {
659                 struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
660                 struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
661                 struct pipeline_msg_req_table_stats_read table_stats_read;
662                 struct pipeline_msg_req_table_rule_add table_rule_add;
663                 struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
664                 struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
665                 struct pipeline_msg_req_table_rule_delete table_rule_delete;
666                 struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
667                 struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
668                 struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
669                 struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
670                 struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
671                 struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
672         };
673 };
674
675 struct pipeline_msg_rsp_port_in_stats_read {
676         struct rte_pipeline_port_in_stats stats;
677 };
678
679 struct pipeline_msg_rsp_port_out_stats_read {
680         struct rte_pipeline_port_out_stats stats;
681 };
682
683 struct pipeline_msg_rsp_table_stats_read {
684         struct rte_pipeline_table_stats stats;
685 };
686
687 struct pipeline_msg_rsp_table_rule_add {
688         void *data;
689 };
690
691 struct pipeline_msg_rsp_table_rule_add_default {
692         void *data;
693 };
694
695 struct pipeline_msg_rsp_table_rule_add_bulk {
696         uint32_t n_rules;
697 };
698
699 struct pipeline_msg_rsp_table_rule_stats_read {
700         struct rte_table_action_stats_counters stats;
701 };
702
703 struct pipeline_msg_rsp_table_rule_mtr_read {
704         struct rte_table_action_mtr_counters stats;
705 };
706
707 struct pipeline_msg_rsp_table_rule_ttl_read {
708         struct rte_table_action_ttl_counters stats;
709 };
710
711 struct pipeline_msg_rsp {
712         int status;
713
714         RTE_STD_C11
715         union {
716                 struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
717                 struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
718                 struct pipeline_msg_rsp_table_stats_read table_stats_read;
719                 struct pipeline_msg_rsp_table_rule_add table_rule_add;
720                 struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
721                 struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
722                 struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
723                 struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
724                 struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
725         };
726 };
727
728 /**
729  * Master thread
730  */
731 static struct pipeline_msg_req *
732 pipeline_msg_alloc(void)
733 {
734         size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
735                 sizeof(struct pipeline_msg_rsp));
736
737         return calloc(1, size);
738 }
739
740 static void
741 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
742 {
743         free(rsp);
744 }
745
746 static struct pipeline_msg_rsp *
747 pipeline_msg_send_recv(struct pipeline *p,
748         struct pipeline_msg_req *req)
749 {
750         struct rte_ring *msgq_req = p->msgq_req;
751         struct rte_ring *msgq_rsp = p->msgq_rsp;
752         struct pipeline_msg_rsp *rsp;
753         int status;
754
755         /* send */
756         do {
757                 status = rte_ring_sp_enqueue(msgq_req, req);
758         } while (status == -ENOBUFS);
759
760         /* recv */
761         do {
762                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
763         } while (status != 0);
764
765         return rsp;
766 }
767
768 int
769 pipeline_port_in_stats_read(const char *pipeline_name,
770         uint32_t port_id,
771         struct rte_pipeline_port_in_stats *stats,
772         int clear)
773 {
774         struct pipeline *p;
775         struct pipeline_msg_req *req;
776         struct pipeline_msg_rsp *rsp;
777         int status;
778
779         /* Check input params */
780         if ((pipeline_name == NULL) ||
781                 (stats == NULL))
782                 return -1;
783
784         p = pipeline_find(pipeline_name);
785         if ((p == NULL) ||
786                 (port_id >= p->n_ports_in))
787                 return -1;
788
789         if (!pipeline_is_running(p)) {
790                 status = rte_pipeline_port_in_stats_read(p->p,
791                         port_id,
792                         stats,
793                         clear);
794
795                 return status;
796         }
797
798         /* Allocate request */
799         req = pipeline_msg_alloc();
800         if (req == NULL)
801                 return -1;
802
803         /* Write request */
804         req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
805         req->id = port_id;
806         req->port_in_stats_read.clear = clear;
807
808         /* Send request and wait for response */
809         rsp = pipeline_msg_send_recv(p, req);
810         if (rsp == NULL)
811                 return -1;
812
813         /* Read response */
814         status = rsp->status;
815         if (status)
816                 memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
817
818         /* Free response */
819         pipeline_msg_free(rsp);
820
821         return status;
822 }
823
824 int
825 pipeline_port_in_enable(const char *pipeline_name,
826         uint32_t port_id)
827 {
828         struct pipeline *p;
829         struct pipeline_msg_req *req;
830         struct pipeline_msg_rsp *rsp;
831         int status;
832
833         /* Check input params */
834         if (pipeline_name == NULL)
835                 return -1;
836
837         p = pipeline_find(pipeline_name);
838         if ((p == NULL) ||
839                 (port_id >= p->n_ports_in))
840                 return -1;
841
842         if (!pipeline_is_running(p)) {
843                 status = rte_pipeline_port_in_enable(p->p, port_id);
844                 return status;
845         }
846
847         /* Allocate request */
848         req = pipeline_msg_alloc();
849         if (req == NULL)
850                 return -1;
851
852         /* Write request */
853         req->type = PIPELINE_REQ_PORT_IN_ENABLE;
854         req->id = port_id;
855
856         /* Send request and wait for response */
857         rsp = pipeline_msg_send_recv(p, req);
858         if (rsp == NULL)
859                 return -1;
860
861         /* Read response */
862         status = rsp->status;
863
864         /* Free response */
865         pipeline_msg_free(rsp);
866
867         return status;
868 }
869
870 int
871 pipeline_port_in_disable(const char *pipeline_name,
872         uint32_t port_id)
873 {
874         struct pipeline *p;
875         struct pipeline_msg_req *req;
876         struct pipeline_msg_rsp *rsp;
877         int status;
878
879         /* Check input params */
880         if (pipeline_name == NULL)
881                 return -1;
882
883         p = pipeline_find(pipeline_name);
884         if ((p == NULL) ||
885                 (port_id >= p->n_ports_in))
886                 return -1;
887
888         if (!pipeline_is_running(p)) {
889                 status = rte_pipeline_port_in_disable(p->p, port_id);
890                 return status;
891         }
892
893         /* Allocate request */
894         req = pipeline_msg_alloc();
895         if (req == NULL)
896                 return -1;
897
898         /* Write request */
899         req->type = PIPELINE_REQ_PORT_IN_DISABLE;
900         req->id = port_id;
901
902         /* Send request and wait for response */
903         rsp = pipeline_msg_send_recv(p, req);
904         if (rsp == NULL)
905                 return -1;
906
907         /* Read response */
908         status = rsp->status;
909
910         /* Free response */
911         pipeline_msg_free(rsp);
912
913         return status;
914 }
915
916 int
917 pipeline_port_out_stats_read(const char *pipeline_name,
918         uint32_t port_id,
919         struct rte_pipeline_port_out_stats *stats,
920         int clear)
921 {
922         struct pipeline *p;
923         struct pipeline_msg_req *req;
924         struct pipeline_msg_rsp *rsp;
925         int status;
926
927         /* Check input params */
928         if ((pipeline_name == NULL) ||
929                 (stats == NULL))
930                 return -1;
931
932         p = pipeline_find(pipeline_name);
933         if ((p == NULL) ||
934                 (port_id >= p->n_ports_out))
935                 return -1;
936
937         if (!pipeline_is_running(p)) {
938                 status = rte_pipeline_port_out_stats_read(p->p,
939                         port_id,
940                         stats,
941                         clear);
942
943                 return status;
944         }
945
946         /* Allocate request */
947         req = pipeline_msg_alloc();
948         if (req == NULL)
949                 return -1;
950
951         /* Write request */
952         req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
953         req->id = port_id;
954         req->port_out_stats_read.clear = clear;
955
956         /* Send request and wait for response */
957         rsp = pipeline_msg_send_recv(p, req);
958         if (rsp == NULL)
959                 return -1;
960
961         /* Read response */
962         status = rsp->status;
963         if (status)
964                 memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
965
966         /* Free response */
967         pipeline_msg_free(rsp);
968
969         return status;
970 }
971
972 int
973 pipeline_table_stats_read(const char *pipeline_name,
974         uint32_t table_id,
975         struct rte_pipeline_table_stats *stats,
976         int clear)
977 {
978         struct pipeline *p;
979         struct pipeline_msg_req *req;
980         struct pipeline_msg_rsp *rsp;
981         int status;
982
983         /* Check input params */
984         if ((pipeline_name == NULL) ||
985                 (stats == NULL))
986                 return -1;
987
988         p = pipeline_find(pipeline_name);
989         if ((p == NULL) ||
990                 (table_id >= p->n_tables))
991                 return -1;
992
993         if (!pipeline_is_running(p)) {
994                 status = rte_pipeline_table_stats_read(p->p,
995                         table_id,
996                         stats,
997                         clear);
998
999                 return status;
1000         }
1001
1002         /* Allocate request */
1003         req = pipeline_msg_alloc();
1004         if (req == NULL)
1005                 return -1;
1006
1007         /* Write request */
1008         req->type = PIPELINE_REQ_TABLE_STATS_READ;
1009         req->id = table_id;
1010         req->table_stats_read.clear = clear;
1011
1012         /* Send request and wait for response */
1013         rsp = pipeline_msg_send_recv(p, req);
1014         if (rsp == NULL)
1015                 return -1;
1016
1017         /* Read response */
1018         status = rsp->status;
1019         if (status)
1020                 memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1021
1022         /* Free response */
1023         pipeline_msg_free(rsp);
1024
1025         return status;
1026 }
1027
1028 static int
1029 match_check(struct table_rule_match *match,
1030         struct pipeline *p,
1031         uint32_t table_id)
1032 {
1033         struct table *table;
1034
1035         if ((match == NULL) ||
1036                 (p == NULL) ||
1037                 (table_id >= p->n_tables))
1038                 return -1;
1039
1040         table = &p->table[table_id];
1041         if (match->match_type != table->params.match_type)
1042                 return -1;
1043
1044         switch (match->match_type) {
1045         case TABLE_ACL:
1046         {
1047                 struct table_acl_params *t = &table->params.match.acl;
1048                 struct table_rule_match_acl *r = &match->match.acl;
1049
1050                 if ((r->ip_version && (t->ip_version == 0)) ||
1051                         ((r->ip_version == 0) && t->ip_version))
1052                         return -1;
1053
1054                 if (r->ip_version) {
1055                         if ((r->sa_depth > 32) ||
1056                                 (r->da_depth > 32))
1057                                 return -1;
1058                 } else {
1059                         if ((r->sa_depth > 128) ||
1060                                 (r->da_depth > 128))
1061                                 return -1;
1062                 }
1063                 return 0;
1064         }
1065
1066         case TABLE_ARRAY:
1067                 return 0;
1068
1069         case TABLE_HASH:
1070                 return 0;
1071
1072         case TABLE_LPM:
1073         {
1074                 struct table_lpm_params *t = &table->params.match.lpm;
1075                 struct table_rule_match_lpm *r = &match->match.lpm;
1076
1077                 if ((r->ip_version && (t->key_size != 4)) ||
1078                         ((r->ip_version == 0) && (t->key_size != 16)))
1079                         return -1;
1080
1081                 if (r->ip_version) {
1082                         if (r->depth > 32)
1083                                 return -1;
1084                 } else {
1085                         if (r->depth > 128)
1086                                 return -1;
1087                 }
1088                 return 0;
1089         }
1090
1091         case TABLE_STUB:
1092                 return -1;
1093
1094         default:
1095                 return -1;
1096         }
1097 }
1098
1099 static int
1100 action_check(struct table_rule_action *action,
1101         struct pipeline *p,
1102         uint32_t table_id)
1103 {
1104         struct table_action_profile *ap;
1105
1106         if ((action == NULL) ||
1107                 (p == NULL) ||
1108                 (table_id >= p->n_tables))
1109                 return -1;
1110
1111         ap = p->table[table_id].ap;
1112         if (action->action_mask != ap->params.action_mask)
1113                 return -1;
1114
1115         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1116                 if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1117                         (action->fwd.id >= p->n_ports_out))
1118                         return -1;
1119
1120                 if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1121                         (action->fwd.id >= p->n_tables))
1122                         return -1;
1123         }
1124
1125         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1126                 uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1127                 uint32_t tc_mask1 = action->mtr.tc_mask;
1128
1129                 if (tc_mask1 != tc_mask0)
1130                         return -1;
1131         }
1132
1133         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1134                 uint32_t n_subports_per_port =
1135                         ap->params.tm.n_subports_per_port;
1136                 uint32_t n_pipes_per_subport =
1137                         ap->params.tm.n_pipes_per_subport;
1138                 uint32_t subport_id = action->tm.subport_id;
1139                 uint32_t pipe_id = action->tm.pipe_id;
1140
1141                 if ((subport_id >= n_subports_per_port) ||
1142                         (pipe_id >= n_pipes_per_subport))
1143                         return -1;
1144         }
1145
1146         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1147                 uint64_t encap_mask = ap->params.encap.encap_mask;
1148                 enum rte_table_action_encap_type type = action->encap.type;
1149
1150                 if ((encap_mask & (1LLU << type)) == 0)
1151                         return -1;
1152         }
1153
1154         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1155                 int ip_version0 = ap->params.common.ip_version;
1156                 int ip_version1 = action->nat.ip_version;
1157
1158                 if ((ip_version1 && (ip_version0 == 0)) ||
1159                         ((ip_version1 == 0) && ip_version0))
1160                         return -1;
1161         }
1162
1163         return 0;
1164 }
1165
1166 static int
1167 action_default_check(struct table_rule_action *action,
1168         struct pipeline *p,
1169         uint32_t table_id)
1170 {
1171         if ((action == NULL) ||
1172                 (action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1173                 (p == NULL) ||
1174                 (table_id >= p->n_tables))
1175                 return -1;
1176
1177         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1178                 if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1179                         (action->fwd.id >= p->n_ports_out))
1180                         return -1;
1181
1182                 if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1183                         (action->fwd.id >= p->n_tables))
1184                         return -1;
1185         }
1186
1187         return 0;
1188 }
1189
1190 union table_rule_match_low_level {
1191         struct rte_table_acl_rule_add_params acl_add;
1192         struct rte_table_acl_rule_delete_params acl_delete;
1193         struct rte_table_array_key array;
1194         uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1195         struct rte_table_lpm_key lpm_ipv4;
1196         struct rte_table_lpm_ipv6_key lpm_ipv6;
1197 };
1198
1199 static int
1200 match_convert(struct table_rule_match *mh,
1201         union table_rule_match_low_level *ml,
1202         int add);
1203
1204 static int
1205 action_convert(struct rte_table_action *a,
1206         struct table_rule_action *action,
1207         struct rte_pipeline_table_entry *data);
1208
1209 int
1210 pipeline_table_rule_add(const char *pipeline_name,
1211         uint32_t table_id,
1212         struct table_rule_match *match,
1213         struct table_rule_action *action)
1214 {
1215         struct pipeline *p;
1216         struct table *table;
1217         struct pipeline_msg_req *req;
1218         struct pipeline_msg_rsp *rsp;
1219         struct table_rule *rule;
1220         int status;
1221
1222         /* Check input params */
1223         if ((pipeline_name == NULL) ||
1224                 (match == NULL) ||
1225                 (action == NULL))
1226                 return -1;
1227
1228         p = pipeline_find(pipeline_name);
1229         if ((p == NULL) ||
1230                 (table_id >= p->n_tables) ||
1231                 match_check(match, p, table_id) ||
1232                 action_check(action, p, table_id))
1233                 return -1;
1234
1235         table = &p->table[table_id];
1236
1237         rule = calloc(1, sizeof(struct table_rule));
1238         if (rule == NULL)
1239                 return -1;
1240
1241         memcpy(&rule->match, match, sizeof(*match));
1242         memcpy(&rule->action, action, sizeof(*action));
1243
1244         if (!pipeline_is_running(p)) {
1245                 union table_rule_match_low_level match_ll;
1246                 struct rte_pipeline_table_entry *data_in, *data_out;
1247                 int key_found;
1248                 uint8_t *buffer;
1249
1250                 buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1251                 if (buffer == NULL) {
1252                         free(rule);
1253                         return -1;
1254                 }
1255
1256                 /* Table match-action rule conversion */
1257                 data_in = (struct rte_pipeline_table_entry *)buffer;
1258
1259                 status = match_convert(match, &match_ll, 1);
1260                 if (status) {
1261                         free(buffer);
1262                         free(rule);
1263                         return -1;
1264                 }
1265
1266                 status = action_convert(table->a, action, data_in);
1267                 if (status) {
1268                         free(buffer);
1269                         free(rule);
1270                         return -1;
1271                 }
1272
1273                 /* Add rule (match, action) to table */
1274                 status = rte_pipeline_table_entry_add(p->p,
1275                                 table_id,
1276                                 &match_ll,
1277                                 data_in,
1278                                 &key_found,
1279                                 &data_out);
1280                 if (status) {
1281                         free(buffer);
1282                         free(rule);
1283                         return -1;
1284                 }
1285
1286                 /* Write Response */
1287                 rule->data = data_out;
1288                 table_rule_add(table, rule);
1289
1290                 free(buffer);
1291                 return 0;
1292         }
1293
1294         /* Allocate request */
1295         req = pipeline_msg_alloc();
1296         if (req == NULL) {
1297                 free(rule);
1298                 return -1;
1299         }
1300
1301         /* Write request */
1302         req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1303         req->id = table_id;
1304         memcpy(&req->table_rule_add.match, match, sizeof(*match));
1305         memcpy(&req->table_rule_add.action, action, sizeof(*action));
1306
1307         /* Send request and wait for response */
1308         rsp = pipeline_msg_send_recv(p, req);
1309         if (rsp == NULL) {
1310                 free(rule);
1311                 return -1;
1312         }
1313
1314         /* Read response */
1315         status = rsp->status;
1316         if (status == 0) {
1317                 rule->data = rsp->table_rule_add.data;
1318                 table_rule_add(table, rule);
1319         } else
1320                 free(rule);
1321
1322         /* Free response */
1323         pipeline_msg_free(rsp);
1324
1325         return status;
1326 }
1327
1328 int
1329 pipeline_table_rule_add_default(const char *pipeline_name,
1330         uint32_t table_id,
1331         struct table_rule_action *action,
1332         void **data)
1333 {
1334         struct pipeline *p;
1335         struct pipeline_msg_req *req;
1336         struct pipeline_msg_rsp *rsp;
1337         int status;
1338
1339         /* Check input params */
1340         if ((pipeline_name == NULL) ||
1341                 (action == NULL) ||
1342                 (data == NULL))
1343                 return -1;
1344
1345         p = pipeline_find(pipeline_name);
1346         if ((p == NULL) ||
1347                 (table_id >= p->n_tables) ||
1348                 action_default_check(action, p, table_id))
1349                 return -1;
1350
1351         if (!pipeline_is_running(p)) {
1352                 struct rte_pipeline_table_entry *data_in, *data_out;
1353                 uint8_t *buffer;
1354
1355                 buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1356                 if (buffer == NULL)
1357                         return -1;
1358
1359                 /* Apply actions */
1360                 data_in = (struct rte_pipeline_table_entry *)buffer;
1361
1362                 data_in->action = action->fwd.action;
1363                 if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1364                         data_in->port_id = action->fwd.id;
1365                 if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1366                         data_in->table_id = action->fwd.id;
1367
1368                 /* Add default rule to table */
1369                 status = rte_pipeline_table_default_entry_add(p->p,
1370                                 table_id,
1371                                 data_in,
1372                                 &data_out);
1373                 if (status) {
1374                         free(buffer);
1375                         return -1;
1376                 }
1377
1378                 /* Write Response */
1379                 *data = data_out;
1380
1381                 free(buffer);
1382                 return 0;
1383         }
1384
1385         /* Allocate request */
1386         req = pipeline_msg_alloc();
1387         if (req == NULL)
1388                 return -1;
1389
1390         /* Write request */
1391         req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1392         req->id = table_id;
1393         memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1394
1395         /* Send request and wait for response */
1396         rsp = pipeline_msg_send_recv(p, req);
1397         if (rsp == NULL)
1398                 return -1;
1399
1400         /* Read response */
1401         status = rsp->status;
1402         if (status == 0)
1403                 *data = rsp->table_rule_add_default.data;
1404
1405         /* Free response */
1406         pipeline_msg_free(rsp);
1407
1408         return status;
1409 }
1410
1411 int
1412 pipeline_table_rule_add_bulk(const char *pipeline_name,
1413         uint32_t table_id,
1414         struct table_rule_match *match,
1415         struct table_rule_action *action,
1416         void **data,
1417         uint32_t *n_rules)
1418 {
1419         struct pipeline *p;
1420         struct pipeline_msg_req *req;
1421         struct pipeline_msg_rsp *rsp;
1422         uint32_t i;
1423         int status;
1424
1425         /* Check input params */
1426         if ((pipeline_name == NULL) ||
1427                 (match == NULL) ||
1428                 (action == NULL) ||
1429                 (data == NULL) ||
1430                 (n_rules == NULL) ||
1431                 (*n_rules == 0))
1432                 return -1;
1433
1434         p = pipeline_find(pipeline_name);
1435         if ((p == NULL) ||
1436                 (table_id >= p->n_tables))
1437                 return -1;
1438
1439         for (i = 0; i < *n_rules; i++)
1440                 if (match_check(match, p, table_id) ||
1441                         action_check(action, p, table_id))
1442                         return -1;
1443
1444         if (!pipeline_is_running(p)) {
1445                 struct rte_table_action *a = p->table[table_id].a;
1446                 union table_rule_match_low_level *match_ll;
1447                 uint8_t *action_ll;
1448                 void **match_ll_ptr;
1449                 struct rte_pipeline_table_entry **action_ll_ptr;
1450                 struct rte_pipeline_table_entry **entries_ptr =
1451                         (struct rte_pipeline_table_entry **)data;
1452                 uint32_t bulk =
1453                         (p->table[table_id].params.match_type == TABLE_ACL) ? 1 : 0;
1454                 int *found;
1455
1456                 /* Memory allocation */
1457                 match_ll = calloc(*n_rules, sizeof(union table_rule_match_low_level));
1458                 action_ll = calloc(*n_rules, TABLE_RULE_ACTION_SIZE_MAX);
1459                 match_ll_ptr = calloc(*n_rules, sizeof(void *));
1460                 action_ll_ptr =
1461                         calloc(*n_rules, sizeof(struct rte_pipeline_table_entry *));
1462                 found = calloc(*n_rules, sizeof(int));
1463
1464                 if (match_ll == NULL ||
1465                         action_ll == NULL ||
1466                         match_ll_ptr == NULL ||
1467                         action_ll_ptr == NULL ||
1468                         found == NULL)
1469                         goto fail;
1470
1471                 for (i = 0; i < *n_rules; i++) {
1472                         match_ll_ptr[i] = (void *)&match_ll[i];
1473                         action_ll_ptr[i] =
1474                                 (struct rte_pipeline_table_entry *)&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1475                 }
1476
1477                 /* Rule match conversion */
1478                 for (i = 0; i < *n_rules; i++) {
1479                         status = match_convert(&match[i], match_ll_ptr[i], 1);
1480                         if (status)
1481                                 goto fail;
1482                 }
1483
1484                 /* Rule action conversion */
1485                 for (i = 0; i < *n_rules; i++) {
1486                         status = action_convert(a, &action[i], action_ll_ptr[i]);
1487                         if (status)
1488                                 goto fail;
1489                 }
1490
1491                 /* Add rule (match, action) to table */
1492                 if (bulk) {
1493                         status = rte_pipeline_table_entry_add_bulk(p->p,
1494                                 table_id,
1495                                 match_ll_ptr,
1496                                 action_ll_ptr,
1497                                 *n_rules,
1498                                 found,
1499                                 entries_ptr);
1500                         if (status)
1501                                 *n_rules = 0;
1502                 } else {
1503                         for (i = 0; i < *n_rules; i++) {
1504                                 status = rte_pipeline_table_entry_add(p->p,
1505                                         table_id,
1506                                         match_ll_ptr[i],
1507                                         action_ll_ptr[i],
1508                                         &found[i],
1509                                         &entries_ptr[i]);
1510                                 if (status) {
1511                                         *n_rules = i;
1512                                         break;
1513                                 }
1514                         }
1515                 }
1516
1517                 /* Free */
1518                 free(found);
1519                 free(action_ll_ptr);
1520                 free(match_ll_ptr);
1521                 free(action_ll);
1522                 free(match_ll);
1523
1524                 return status;
1525
1526 fail:
1527                 free(found);
1528                 free(action_ll_ptr);
1529                 free(match_ll_ptr);
1530                 free(action_ll);
1531                 free(match_ll);
1532
1533                 *n_rules = 0;
1534                 return -1;
1535         }
1536
1537         /* Allocate request */
1538         req = pipeline_msg_alloc();
1539         if (req == NULL)
1540                 return -1;
1541
1542         /* Write request */
1543         req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1544         req->id = table_id;
1545         req->table_rule_add_bulk.match = match;
1546         req->table_rule_add_bulk.action = action;
1547         req->table_rule_add_bulk.data = data;
1548         req->table_rule_add_bulk.n_rules = *n_rules;
1549         req->table_rule_add_bulk.bulk =
1550                 (p->table[table_id].params.match_type == TABLE_ACL) ? 1 : 0;
1551
1552         /* Send request and wait for response */
1553         rsp = pipeline_msg_send_recv(p, req);
1554         if (rsp == NULL)
1555                 return -1;
1556
1557         /* Read response */
1558         status = rsp->status;
1559         if (status == 0)
1560                 *n_rules = rsp->table_rule_add_bulk.n_rules;
1561
1562         /* Free response */
1563         pipeline_msg_free(rsp);
1564
1565         return status;
1566 }
1567
1568 int
1569 pipeline_table_rule_delete(const char *pipeline_name,
1570         uint32_t table_id,
1571         struct table_rule_match *match)
1572 {
1573         struct pipeline *p;
1574         struct pipeline_msg_req *req;
1575         struct pipeline_msg_rsp *rsp;
1576         int status;
1577
1578         /* Check input params */
1579         if ((pipeline_name == NULL) ||
1580                 (match == NULL))
1581                 return -1;
1582
1583         p = pipeline_find(pipeline_name);
1584         if ((p == NULL) ||
1585                 (table_id >= p->n_tables) ||
1586                 match_check(match, p, table_id))
1587                 return -1;
1588
1589         if (!pipeline_is_running(p)) {
1590                 union table_rule_match_low_level match_ll;
1591                 int key_found;
1592
1593                 status = match_convert(match, &match_ll, 0);
1594                 if (status)
1595                         return -1;
1596
1597                 status = rte_pipeline_table_entry_delete(p->p,
1598                                 table_id,
1599                                 &match_ll,
1600                                 &key_found,
1601                                 NULL);
1602
1603                 return status;
1604         }
1605
1606         /* Allocate request */
1607         req = pipeline_msg_alloc();
1608         if (req == NULL)
1609                 return -1;
1610
1611         /* Write request */
1612         req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1613         req->id = table_id;
1614         memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1615
1616         /* Send request and wait for response */
1617         rsp = pipeline_msg_send_recv(p, req);
1618         if (rsp == NULL)
1619                 return -1;
1620
1621         /* Read response */
1622         status = rsp->status;
1623
1624         /* Free response */
1625         pipeline_msg_free(rsp);
1626
1627         return status;
1628 }
1629
1630 int
1631 pipeline_table_rule_delete_default(const char *pipeline_name,
1632         uint32_t table_id)
1633 {
1634         struct pipeline *p;
1635         struct pipeline_msg_req *req;
1636         struct pipeline_msg_rsp *rsp;
1637         int status;
1638
1639         /* Check input params */
1640         if (pipeline_name == NULL)
1641                 return -1;
1642
1643         p = pipeline_find(pipeline_name);
1644         if ((p == NULL) ||
1645                 (table_id >= p->n_tables))
1646                 return -1;
1647
1648         if (!pipeline_is_running(p)) {
1649                 status = rte_pipeline_table_default_entry_delete(p->p,
1650                         table_id,
1651                         NULL);
1652
1653                 return status;
1654         }
1655
1656         /* Allocate request */
1657         req = pipeline_msg_alloc();
1658         if (req == NULL)
1659                 return -1;
1660
1661         /* Write request */
1662         req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1663         req->id = table_id;
1664
1665         /* Send request and wait for response */
1666         rsp = pipeline_msg_send_recv(p, req);
1667         if (rsp == NULL)
1668                 return -1;
1669
1670         /* Read response */
1671         status = rsp->status;
1672
1673         /* Free response */
1674         pipeline_msg_free(rsp);
1675
1676         return status;
1677 }
1678
1679 int
1680 pipeline_table_rule_stats_read(const char *pipeline_name,
1681         uint32_t table_id,
1682         void *data,
1683         struct rte_table_action_stats_counters *stats,
1684         int clear)
1685 {
1686         struct pipeline *p;
1687         struct pipeline_msg_req *req;
1688         struct pipeline_msg_rsp *rsp;
1689         int status;
1690
1691         /* Check input params */
1692         if ((pipeline_name == NULL) ||
1693                 (data == NULL) ||
1694                 (stats == NULL))
1695                 return -1;
1696
1697         p = pipeline_find(pipeline_name);
1698         if ((p == NULL) ||
1699                 (table_id >= p->n_tables))
1700                 return -1;
1701
1702         if (!pipeline_is_running(p)) {
1703                 struct rte_table_action *a = p->table[table_id].a;
1704
1705                 status = rte_table_action_stats_read(a,
1706                         data,
1707                         stats,
1708                         clear);
1709
1710                 return status;
1711         }
1712
1713         /* Allocate request */
1714         req = pipeline_msg_alloc();
1715         if (req == NULL)
1716                 return -1;
1717
1718         /* Write request */
1719         req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1720         req->id = table_id;
1721         req->table_rule_stats_read.data = data;
1722         req->table_rule_stats_read.clear = clear;
1723
1724         /* Send request and wait for response */
1725         rsp = pipeline_msg_send_recv(p, req);
1726         if (rsp == NULL)
1727                 return -1;
1728
1729         /* Read response */
1730         status = rsp->status;
1731         if (status)
1732                 memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1733
1734         /* Free response */
1735         pipeline_msg_free(rsp);
1736
1737         return status;
1738 }
1739
1740 int
1741 pipeline_table_mtr_profile_add(const char *pipeline_name,
1742         uint32_t table_id,
1743         uint32_t meter_profile_id,
1744         struct rte_table_action_meter_profile *profile)
1745 {
1746         struct pipeline *p;
1747         struct pipeline_msg_req *req;
1748         struct pipeline_msg_rsp *rsp;
1749         int status;
1750
1751         /* Check input params */
1752         if ((pipeline_name == NULL) ||
1753                 (profile == NULL))
1754                 return -1;
1755
1756         p = pipeline_find(pipeline_name);
1757         if ((p == NULL) ||
1758                 (table_id >= p->n_tables))
1759                 return -1;
1760
1761         if (!pipeline_is_running(p)) {
1762                 struct rte_table_action *a = p->table[table_id].a;
1763
1764                 status = rte_table_action_meter_profile_add(a,
1765                         meter_profile_id,
1766                         profile);
1767
1768                 return status;
1769         }
1770
1771         /* Allocate request */
1772         req = pipeline_msg_alloc();
1773         if (req == NULL)
1774                 return -1;
1775
1776         /* Write request */
1777         req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1778         req->id = table_id;
1779         req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1780         memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1781
1782         /* Send request and wait for response */
1783         rsp = pipeline_msg_send_recv(p, req);
1784         if (rsp == NULL)
1785                 return -1;
1786
1787         /* Read response */
1788         status = rsp->status;
1789
1790         /* Free response */
1791         pipeline_msg_free(rsp);
1792
1793         return status;
1794 }
1795
1796 int
1797 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1798         uint32_t table_id,
1799         uint32_t meter_profile_id)
1800 {
1801         struct pipeline *p;
1802         struct pipeline_msg_req *req;
1803         struct pipeline_msg_rsp *rsp;
1804         int status;
1805
1806         /* Check input params */
1807         if (pipeline_name == NULL)
1808                 return -1;
1809
1810         p = pipeline_find(pipeline_name);
1811         if ((p == NULL) ||
1812                 (table_id >= p->n_tables))
1813                 return -1;
1814
1815         if (!pipeline_is_running(p)) {
1816                 struct rte_table_action *a = p->table[table_id].a;
1817
1818                 status = rte_table_action_meter_profile_delete(a,
1819                                 meter_profile_id);
1820
1821                 return status;
1822         }
1823
1824         /* Allocate request */
1825         req = pipeline_msg_alloc();
1826         if (req == NULL)
1827                 return -1;
1828
1829         /* Write request */
1830         req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1831         req->id = table_id;
1832         req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1833
1834         /* Send request and wait for response */
1835         rsp = pipeline_msg_send_recv(p, req);
1836         if (rsp == NULL)
1837                 return -1;
1838
1839         /* Read response */
1840         status = rsp->status;
1841
1842         /* Free response */
1843         pipeline_msg_free(rsp);
1844
1845         return status;
1846 }
1847
1848 int
1849 pipeline_table_rule_mtr_read(const char *pipeline_name,
1850         uint32_t table_id,
1851         void *data,
1852         uint32_t tc_mask,
1853         struct rte_table_action_mtr_counters *stats,
1854         int clear)
1855 {
1856         struct pipeline *p;
1857         struct pipeline_msg_req *req;
1858         struct pipeline_msg_rsp *rsp;
1859         int status;
1860
1861         /* Check input params */
1862         if ((pipeline_name == NULL) ||
1863                 (data == NULL) ||
1864                 (stats == NULL))
1865                 return -1;
1866
1867         p = pipeline_find(pipeline_name);
1868         if ((p == NULL) ||
1869                 (table_id >= p->n_tables))
1870                 return -1;
1871
1872         if (!pipeline_is_running(p)) {
1873                 struct rte_table_action *a = p->table[table_id].a;
1874
1875                 status = rte_table_action_meter_read(a,
1876                                 data,
1877                                 tc_mask,
1878                                 stats,
1879                                 clear);
1880
1881                 return status;
1882         }
1883
1884         /* Allocate request */
1885         req = pipeline_msg_alloc();
1886         if (req == NULL)
1887                 return -1;
1888
1889         /* Write request */
1890         req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
1891         req->id = table_id;
1892         req->table_rule_mtr_read.data = data;
1893         req->table_rule_mtr_read.tc_mask = tc_mask;
1894         req->table_rule_mtr_read.clear = clear;
1895
1896         /* Send request and wait for response */
1897         rsp = pipeline_msg_send_recv(p, req);
1898         if (rsp == NULL)
1899                 return -1;
1900
1901         /* Read response */
1902         status = rsp->status;
1903         if (status)
1904                 memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
1905
1906         /* Free response */
1907         pipeline_msg_free(rsp);
1908
1909         return status;
1910 }
1911
1912 int
1913 pipeline_table_dscp_table_update(const char *pipeline_name,
1914         uint32_t table_id,
1915         uint64_t dscp_mask,
1916         struct rte_table_action_dscp_table *dscp_table)
1917 {
1918         struct pipeline *p;
1919         struct pipeline_msg_req *req;
1920         struct pipeline_msg_rsp *rsp;
1921         int status;
1922
1923         /* Check input params */
1924         if ((pipeline_name == NULL) ||
1925                 (dscp_table == NULL))
1926                 return -1;
1927
1928         p = pipeline_find(pipeline_name);
1929         if ((p == NULL) ||
1930                 (table_id >= p->n_tables))
1931                 return -1;
1932
1933         if (!pipeline_is_running(p)) {
1934                 struct rte_table_action *a = p->table[table_id].a;
1935
1936                 status = rte_table_action_dscp_table_update(a,
1937                                 dscp_mask,
1938                                 dscp_table);
1939
1940                 return status;
1941         }
1942
1943         /* Allocate request */
1944         req = pipeline_msg_alloc();
1945         if (req == NULL)
1946                 return -1;
1947
1948         /* Write request */
1949         req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
1950         req->id = table_id;
1951         req->table_dscp_table_update.dscp_mask = dscp_mask;
1952         memcpy(&req->table_dscp_table_update.dscp_table,
1953                 dscp_table, sizeof(*dscp_table));
1954
1955         /* Send request and wait for response */
1956         rsp = pipeline_msg_send_recv(p, req);
1957         if (rsp == NULL)
1958                 return -1;
1959
1960         /* Read response */
1961         status = rsp->status;
1962
1963         /* Free response */
1964         pipeline_msg_free(rsp);
1965
1966         return status;
1967 }
1968
1969 int
1970 pipeline_table_rule_ttl_read(const char *pipeline_name,
1971         uint32_t table_id,
1972         void *data,
1973         struct rte_table_action_ttl_counters *stats,
1974         int clear)
1975 {
1976         struct pipeline *p;
1977         struct pipeline_msg_req *req;
1978         struct pipeline_msg_rsp *rsp;
1979         int status;
1980
1981         /* Check input params */
1982         if ((pipeline_name == NULL) ||
1983                 (data == NULL) ||
1984                 (stats == NULL))
1985                 return -1;
1986
1987         p = pipeline_find(pipeline_name);
1988         if ((p == NULL) ||
1989                 (table_id >= p->n_tables))
1990                 return -1;
1991
1992         if (!pipeline_is_running(p)) {
1993                 struct rte_table_action *a = p->table[table_id].a;
1994
1995                 status = rte_table_action_ttl_read(a,
1996                                 data,
1997                                 stats,
1998                                 clear);
1999
2000                 return status;
2001         }
2002
2003         /* Allocate request */
2004         req = pipeline_msg_alloc();
2005         if (req == NULL)
2006                 return -1;
2007
2008         /* Write request */
2009         req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2010         req->id = table_id;
2011         req->table_rule_ttl_read.data = data;
2012         req->table_rule_ttl_read.clear = clear;
2013
2014         /* Send request and wait for response */
2015         rsp = pipeline_msg_send_recv(p, req);
2016         if (rsp == NULL)
2017                 return -1;
2018
2019         /* Read response */
2020         status = rsp->status;
2021         if (status)
2022                 memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2023
2024         /* Free response */
2025         pipeline_msg_free(rsp);
2026
2027         return status;
2028 }
2029
2030 /**
2031  * Data plane threads: message handling
2032  */
2033 static inline struct pipeline_msg_req *
2034 pipeline_msg_recv(struct rte_ring *msgq_req)
2035 {
2036         struct pipeline_msg_req *req;
2037
2038         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2039
2040         if (status != 0)
2041                 return NULL;
2042
2043         return req;
2044 }
2045
2046 static inline void
2047 pipeline_msg_send(struct rte_ring *msgq_rsp,
2048         struct pipeline_msg_rsp *rsp)
2049 {
2050         int status;
2051
2052         do {
2053                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2054         } while (status == -ENOBUFS);
2055 }
2056
2057 static struct pipeline_msg_rsp *
2058 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2059         struct pipeline_msg_req *req)
2060 {
2061         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2062         uint32_t port_id = req->id;
2063         int clear = req->port_in_stats_read.clear;
2064
2065         rsp->status = rte_pipeline_port_in_stats_read(p->p,
2066                 port_id,
2067                 &rsp->port_in_stats_read.stats,
2068                 clear);
2069
2070         return rsp;
2071 }
2072
2073 static struct pipeline_msg_rsp *
2074 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2075         struct pipeline_msg_req *req)
2076 {
2077         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2078         uint32_t port_id = req->id;
2079
2080         rsp->status = rte_pipeline_port_in_enable(p->p,
2081                 port_id);
2082
2083         return rsp;
2084 }
2085
2086 static struct pipeline_msg_rsp *
2087 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2088         struct pipeline_msg_req *req)
2089 {
2090         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2091         uint32_t port_id = req->id;
2092
2093         rsp->status = rte_pipeline_port_in_disable(p->p,
2094                 port_id);
2095
2096         return rsp;
2097 }
2098
2099 static struct pipeline_msg_rsp *
2100 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2101         struct pipeline_msg_req *req)
2102 {
2103         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2104         uint32_t port_id = req->id;
2105         int clear = req->port_out_stats_read.clear;
2106
2107         rsp->status = rte_pipeline_port_out_stats_read(p->p,
2108                 port_id,
2109                 &rsp->port_out_stats_read.stats,
2110                 clear);
2111
2112         return rsp;
2113 }
2114
2115 static struct pipeline_msg_rsp *
2116 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2117         struct pipeline_msg_req *req)
2118 {
2119         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2120         uint32_t port_id = req->id;
2121         int clear = req->table_stats_read.clear;
2122
2123         rsp->status = rte_pipeline_table_stats_read(p->p,
2124                 port_id,
2125                 &rsp->table_stats_read.stats,
2126                 clear);
2127
2128         return rsp;
2129 }
2130
2131 static int
2132 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2133 {
2134         if (depth > 128)
2135                 return -1;
2136
2137         switch (depth / 32) {
2138         case 0:
2139                 depth32[0] = depth;
2140                 depth32[1] = 0;
2141                 depth32[2] = 0;
2142                 depth32[3] = 0;
2143                 return 0;
2144
2145         case 1:
2146                 depth32[0] = 32;
2147                 depth32[1] = depth - 32;
2148                 depth32[2] = 0;
2149                 depth32[3] = 0;
2150                 return 0;
2151
2152         case 2:
2153                 depth32[0] = 32;
2154                 depth32[1] = 32;
2155                 depth32[2] = depth - 64;
2156                 depth32[3] = 0;
2157                 return 0;
2158
2159         case 3:
2160                 depth32[0] = 32;
2161                 depth32[1] = 32;
2162                 depth32[2] = 32;
2163                 depth32[3] = depth - 96;
2164                 return 0;
2165
2166         case 4:
2167                 depth32[0] = 32;
2168                 depth32[1] = 32;
2169                 depth32[2] = 32;
2170                 depth32[3] = 32;
2171                 return 0;
2172
2173         default:
2174                 return -1;
2175         }
2176 }
2177
2178 static int
2179 match_convert(struct table_rule_match *mh,
2180         union table_rule_match_low_level *ml,
2181         int add)
2182 {
2183         memset(ml, 0, sizeof(*ml));
2184
2185         switch (mh->match_type) {
2186         case TABLE_ACL:
2187                 if (mh->match.acl.ip_version)
2188                         if (add) {
2189                                 ml->acl_add.field_value[0].value.u8 =
2190                                         mh->match.acl.proto;
2191                                 ml->acl_add.field_value[0].mask_range.u8 =
2192                                         mh->match.acl.proto_mask;
2193
2194                                 ml->acl_add.field_value[1].value.u32 =
2195                                         mh->match.acl.ipv4.sa;
2196                                 ml->acl_add.field_value[1].mask_range.u32 =
2197                                         mh->match.acl.sa_depth;
2198
2199                                 ml->acl_add.field_value[2].value.u32 =
2200                                         mh->match.acl.ipv4.da;
2201                                 ml->acl_add.field_value[2].mask_range.u32 =
2202                                         mh->match.acl.da_depth;
2203
2204                                 ml->acl_add.field_value[3].value.u16 =
2205                                         mh->match.acl.sp0;
2206                                 ml->acl_add.field_value[3].mask_range.u16 =
2207                                         mh->match.acl.sp1;
2208
2209                                 ml->acl_add.field_value[4].value.u16 =
2210                                         mh->match.acl.dp0;
2211                                 ml->acl_add.field_value[4].mask_range.u16 =
2212                                         mh->match.acl.dp1;
2213
2214                                 ml->acl_add.priority =
2215                                         (int32_t) mh->match.acl.priority;
2216                         } else {
2217                                 ml->acl_delete.field_value[0].value.u8 =
2218                                         mh->match.acl.proto;
2219                                 ml->acl_delete.field_value[0].mask_range.u8 =
2220                                         mh->match.acl.proto_mask;
2221
2222                                 ml->acl_delete.field_value[1].value.u32 =
2223                                         mh->match.acl.ipv4.sa;
2224                                 ml->acl_delete.field_value[1].mask_range.u32 =
2225                                         mh->match.acl.sa_depth;
2226
2227                                 ml->acl_delete.field_value[2].value.u32 =
2228                                         mh->match.acl.ipv4.da;
2229                                 ml->acl_delete.field_value[2].mask_range.u32 =
2230                                         mh->match.acl.da_depth;
2231
2232                                 ml->acl_delete.field_value[3].value.u16 =
2233                                         mh->match.acl.sp0;
2234                                 ml->acl_delete.field_value[3].mask_range.u16 =
2235                                         mh->match.acl.sp1;
2236
2237                                 ml->acl_delete.field_value[4].value.u16 =
2238                                         mh->match.acl.dp0;
2239                                 ml->acl_delete.field_value[4].mask_range.u16 =
2240                                         mh->match.acl.dp1;
2241                         }
2242                 else
2243                         if (add) {
2244                                 uint32_t *sa32 =
2245                                         (uint32_t *) mh->match.acl.ipv6.sa;
2246                                 uint32_t *da32 =
2247                                         (uint32_t *) mh->match.acl.ipv6.da;
2248                                 uint32_t sa32_depth[4], da32_depth[4];
2249                                 int status;
2250
2251                                 status = match_convert_ipv6_depth(
2252                                         mh->match.acl.sa_depth,
2253                                         sa32_depth);
2254                                 if (status)
2255                                         return status;
2256
2257                                 status = match_convert_ipv6_depth(
2258                                         mh->match.acl.da_depth,
2259                                         da32_depth);
2260                                 if (status)
2261                                         return status;
2262
2263                                 ml->acl_add.field_value[0].value.u8 =
2264                                         mh->match.acl.proto;
2265                                 ml->acl_add.field_value[0].mask_range.u8 =
2266                                         mh->match.acl.proto_mask;
2267
2268                                 ml->acl_add.field_value[1].value.u32 =
2269                                         rte_be_to_cpu_32(sa32[0]);
2270                                 ml->acl_add.field_value[1].mask_range.u32 =
2271                                         sa32_depth[0];
2272                                 ml->acl_add.field_value[2].value.u32 =
2273                                         rte_be_to_cpu_32(sa32[1]);
2274                                 ml->acl_add.field_value[2].mask_range.u32 =
2275                                         sa32_depth[1];
2276                                 ml->acl_add.field_value[3].value.u32 =
2277                                         rte_be_to_cpu_32(sa32[2]);
2278                                 ml->acl_add.field_value[3].mask_range.u32 =
2279                                         sa32_depth[2];
2280                                 ml->acl_add.field_value[4].value.u32 =
2281                                         rte_be_to_cpu_32(sa32[3]);
2282                                 ml->acl_add.field_value[4].mask_range.u32 =
2283                                         sa32_depth[3];
2284
2285                                 ml->acl_add.field_value[5].value.u32 =
2286                                         rte_be_to_cpu_32(da32[0]);
2287                                 ml->acl_add.field_value[5].mask_range.u32 =
2288                                         da32_depth[0];
2289                                 ml->acl_add.field_value[6].value.u32 =
2290                                         rte_be_to_cpu_32(da32[1]);
2291                                 ml->acl_add.field_value[6].mask_range.u32 =
2292                                         da32_depth[1];
2293                                 ml->acl_add.field_value[7].value.u32 =
2294                                         rte_be_to_cpu_32(da32[2]);
2295                                 ml->acl_add.field_value[7].mask_range.u32 =
2296                                         da32_depth[2];
2297                                 ml->acl_add.field_value[8].value.u32 =
2298                                         rte_be_to_cpu_32(da32[3]);
2299                                 ml->acl_add.field_value[8].mask_range.u32 =
2300                                         da32_depth[3];
2301
2302                                 ml->acl_add.field_value[9].value.u16 =
2303                                         mh->match.acl.sp0;
2304                                 ml->acl_add.field_value[9].mask_range.u16 =
2305                                         mh->match.acl.sp1;
2306
2307                                 ml->acl_add.field_value[10].value.u16 =
2308                                         mh->match.acl.dp0;
2309                                 ml->acl_add.field_value[10].mask_range.u16 =
2310                                         mh->match.acl.dp1;
2311
2312                                 ml->acl_add.priority =
2313                                         (int32_t) mh->match.acl.priority;
2314                         } else {
2315                                 uint32_t *sa32 =
2316                                         (uint32_t *) mh->match.acl.ipv6.sa;
2317                                 uint32_t *da32 =
2318                                         (uint32_t *) mh->match.acl.ipv6.da;
2319                                 uint32_t sa32_depth[4], da32_depth[4];
2320                                 int status;
2321
2322                                 status = match_convert_ipv6_depth(
2323                                         mh->match.acl.sa_depth,
2324                                         sa32_depth);
2325                                 if (status)
2326                                         return status;
2327
2328                                 status = match_convert_ipv6_depth(
2329                                         mh->match.acl.da_depth,
2330                                         da32_depth);
2331                                 if (status)
2332                                         return status;
2333
2334                                 ml->acl_delete.field_value[0].value.u8 =
2335                                         mh->match.acl.proto;
2336                                 ml->acl_delete.field_value[0].mask_range.u8 =
2337                                         mh->match.acl.proto_mask;
2338
2339                                 ml->acl_delete.field_value[1].value.u32 =
2340                                         rte_be_to_cpu_32(sa32[0]);
2341                                 ml->acl_delete.field_value[1].mask_range.u32 =
2342                                         sa32_depth[0];
2343                                 ml->acl_delete.field_value[2].value.u32 =
2344                                         rte_be_to_cpu_32(sa32[1]);
2345                                 ml->acl_delete.field_value[2].mask_range.u32 =
2346                                         sa32_depth[1];
2347                                 ml->acl_delete.field_value[3].value.u32 =
2348                                         rte_be_to_cpu_32(sa32[2]);
2349                                 ml->acl_delete.field_value[3].mask_range.u32 =
2350                                         sa32_depth[2];
2351                                 ml->acl_delete.field_value[4].value.u32 =
2352                                         rte_be_to_cpu_32(sa32[3]);
2353                                 ml->acl_delete.field_value[4].mask_range.u32 =
2354                                         sa32_depth[3];
2355
2356                                 ml->acl_delete.field_value[5].value.u32 =
2357                                         rte_be_to_cpu_32(da32[0]);
2358                                 ml->acl_delete.field_value[5].mask_range.u32 =
2359                                         da32_depth[0];
2360                                 ml->acl_delete.field_value[6].value.u32 =
2361                                         rte_be_to_cpu_32(da32[1]);
2362                                 ml->acl_delete.field_value[6].mask_range.u32 =
2363                                         da32_depth[1];
2364                                 ml->acl_delete.field_value[7].value.u32 =
2365                                         rte_be_to_cpu_32(da32[2]);
2366                                 ml->acl_delete.field_value[7].mask_range.u32 =
2367                                         da32_depth[2];
2368                                 ml->acl_delete.field_value[8].value.u32 =
2369                                         rte_be_to_cpu_32(da32[3]);
2370                                 ml->acl_delete.field_value[8].mask_range.u32 =
2371                                         da32_depth[3];
2372
2373                                 ml->acl_delete.field_value[9].value.u16 =
2374                                         mh->match.acl.sp0;
2375                                 ml->acl_delete.field_value[9].mask_range.u16 =
2376                                         mh->match.acl.sp1;
2377
2378                                 ml->acl_delete.field_value[10].value.u16 =
2379                                         mh->match.acl.dp0;
2380                                 ml->acl_delete.field_value[10].mask_range.u16 =
2381                                         mh->match.acl.dp1;
2382                         }
2383                 return 0;
2384
2385         case TABLE_ARRAY:
2386                 ml->array.pos = mh->match.array.pos;
2387                 return 0;
2388
2389         case TABLE_HASH:
2390                 memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2391                 return 0;
2392
2393         case TABLE_LPM:
2394                 if (mh->match.lpm.ip_version) {
2395                         ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2396                         ml->lpm_ipv4.depth = mh->match.lpm.depth;
2397                 } else {
2398                         memcpy(ml->lpm_ipv6.ip,
2399                                 mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2400                         ml->lpm_ipv6.depth = mh->match.lpm.depth;
2401                 }
2402
2403                 return 0;
2404
2405         default:
2406                 return -1;
2407         }
2408 }
2409
2410 static int
2411 action_convert(struct rte_table_action *a,
2412         struct table_rule_action *action,
2413         struct rte_pipeline_table_entry *data)
2414 {
2415         int status;
2416
2417         /* Apply actions */
2418         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2419                 status = rte_table_action_apply(a,
2420                         data,
2421                         RTE_TABLE_ACTION_FWD,
2422                         &action->fwd);
2423
2424                 if (status)
2425                         return status;
2426         }
2427
2428         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2429                 status = rte_table_action_apply(a,
2430                         data,
2431                         RTE_TABLE_ACTION_LB,
2432                         &action->lb);
2433
2434                 if (status)
2435                         return status;
2436         }
2437
2438         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2439                 status = rte_table_action_apply(a,
2440                         data,
2441                         RTE_TABLE_ACTION_MTR,
2442                         &action->mtr);
2443
2444                 if (status)
2445                         return status;
2446         }
2447
2448         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2449                 status = rte_table_action_apply(a,
2450                         data,
2451                         RTE_TABLE_ACTION_TM,
2452                         &action->tm);
2453
2454                 if (status)
2455                         return status;
2456         }
2457
2458         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2459                 status = rte_table_action_apply(a,
2460                         data,
2461                         RTE_TABLE_ACTION_ENCAP,
2462                         &action->encap);
2463
2464                 if (status)
2465                         return status;
2466         }
2467
2468         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2469                 status = rte_table_action_apply(a,
2470                         data,
2471                         RTE_TABLE_ACTION_NAT,
2472                         &action->nat);
2473
2474                 if (status)
2475                         return status;
2476         }
2477
2478         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2479                 status = rte_table_action_apply(a,
2480                         data,
2481                         RTE_TABLE_ACTION_TTL,
2482                         &action->ttl);
2483
2484                 if (status)
2485                         return status;
2486         }
2487
2488         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2489                 status = rte_table_action_apply(a,
2490                         data,
2491                         RTE_TABLE_ACTION_STATS,
2492                         &action->stats);
2493
2494                 if (status)
2495                         return status;
2496         }
2497
2498         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2499                 status = rte_table_action_apply(a,
2500                         data,
2501                         RTE_TABLE_ACTION_TIME,
2502                         &action->time);
2503
2504                 if (status)
2505                         return status;
2506         }
2507
2508         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2509                 status = rte_table_action_apply(a,
2510                         data,
2511                         RTE_TABLE_ACTION_SYM_CRYPTO,
2512                         &action->sym_crypto);
2513
2514                 if (status)
2515                         return status;
2516         }
2517
2518         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2519                 status = rte_table_action_apply(a,
2520                         data,
2521                         RTE_TABLE_ACTION_TAG,
2522                         &action->tag);
2523
2524                 if (status)
2525                         return status;
2526         }
2527
2528         if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2529                 status = rte_table_action_apply(a,
2530                         data,
2531                         RTE_TABLE_ACTION_DECAP,
2532                         &action->decap);
2533
2534                 if (status)
2535                         return status;
2536         }
2537
2538         return 0;
2539 }
2540
2541 static struct pipeline_msg_rsp *
2542 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2543         struct pipeline_msg_req *req)
2544 {
2545         union table_rule_match_low_level match_ll;
2546         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2547         struct table_rule_match *match = &req->table_rule_add.match;
2548         struct table_rule_action *action = &req->table_rule_add.action;
2549         struct rte_pipeline_table_entry *data_in, *data_out;
2550         uint32_t table_id = req->id;
2551         int key_found, status;
2552         struct rte_table_action *a = p->table_data[table_id].a;
2553
2554         /* Apply actions */
2555         memset(p->buffer, 0, sizeof(p->buffer));
2556         data_in = (struct rte_pipeline_table_entry *) p->buffer;
2557
2558         status = match_convert(match, &match_ll, 1);
2559         if (status) {
2560                 rsp->status = -1;
2561                 return rsp;
2562         }
2563
2564         status = action_convert(a, action, data_in);
2565         if (status) {
2566                 rsp->status = -1;
2567                 return rsp;
2568         }
2569
2570         status = rte_pipeline_table_entry_add(p->p,
2571                 table_id,
2572                 &match_ll,
2573                 data_in,
2574                 &key_found,
2575                 &data_out);
2576         if (status) {
2577                 rsp->status = -1;
2578                 return rsp;
2579         }
2580
2581         /* Write response */
2582         rsp->status = 0;
2583         rsp->table_rule_add.data = data_out;
2584
2585         return rsp;
2586 }
2587
2588 static struct pipeline_msg_rsp *
2589 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2590         struct pipeline_msg_req *req)
2591 {
2592         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2593         struct table_rule_action *action = &req->table_rule_add_default.action;
2594         struct rte_pipeline_table_entry *data_in, *data_out;
2595         uint32_t table_id = req->id;
2596         int status;
2597
2598         /* Apply actions */
2599         memset(p->buffer, 0, sizeof(p->buffer));
2600         data_in = (struct rte_pipeline_table_entry *) p->buffer;
2601
2602         data_in->action = action->fwd.action;
2603         if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2604                 data_in->port_id = action->fwd.id;
2605         if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2606                 data_in->table_id = action->fwd.id;
2607
2608         /* Add default rule to table */
2609         status = rte_pipeline_table_default_entry_add(p->p,
2610                 table_id,
2611                 data_in,
2612                 &data_out);
2613         if (status) {
2614                 rsp->status = -1;
2615                 return rsp;
2616         }
2617
2618         /* Write response */
2619         rsp->status = 0;
2620         rsp->table_rule_add_default.data = data_out;
2621
2622         return rsp;
2623 }
2624
2625 static struct pipeline_msg_rsp *
2626 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2627         struct pipeline_msg_req *req)
2628 {
2629
2630         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2631
2632         uint32_t table_id = req->id;
2633         struct table_rule_match *match = req->table_rule_add_bulk.match;
2634         struct table_rule_action *action = req->table_rule_add_bulk.action;
2635         struct rte_pipeline_table_entry **data =
2636                 (struct rte_pipeline_table_entry **)req->table_rule_add_bulk.data;
2637         uint32_t n_rules = req->table_rule_add_bulk.n_rules;
2638         uint32_t bulk = req->table_rule_add_bulk.bulk;
2639
2640         struct rte_table_action *a = p->table_data[table_id].a;
2641         union table_rule_match_low_level *match_ll;
2642         uint8_t *action_ll;
2643         void **match_ll_ptr;
2644         struct rte_pipeline_table_entry **action_ll_ptr;
2645         int *found, status;
2646         uint32_t i;
2647
2648         /* Memory allocation */
2649         match_ll = calloc(n_rules, sizeof(union table_rule_match_low_level));
2650         action_ll = calloc(n_rules, TABLE_RULE_ACTION_SIZE_MAX);
2651         match_ll_ptr = calloc(n_rules, sizeof(void *));
2652         action_ll_ptr =
2653                 calloc(n_rules, sizeof(struct rte_pipeline_table_entry *));
2654         found = calloc(n_rules, sizeof(int));
2655
2656         if ((match_ll == NULL) ||
2657                 (action_ll == NULL) ||
2658                 (match_ll_ptr == NULL) ||
2659                 (action_ll_ptr == NULL) ||
2660                 (found == NULL))
2661                 goto fail;
2662
2663         for (i = 0; i < n_rules; i++) {
2664                 match_ll_ptr[i] = (void *)&match_ll[i];
2665                 action_ll_ptr[i] =
2666                         (struct rte_pipeline_table_entry *)&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
2667         }
2668
2669         /* Rule match conversion */
2670         for (i = 0; i < n_rules; i++) {
2671                 status = match_convert(&match[i], match_ll_ptr[i], 1);
2672                 if (status)
2673                         goto fail;
2674         }
2675
2676         /* Rule action conversion */
2677         for (i = 0; i < n_rules; i++) {
2678                 status = action_convert(a, &action[i], action_ll_ptr[i]);
2679                 if (status)
2680                         goto fail;
2681         }
2682
2683         /* Add rule (match, action) to table */
2684         if (bulk) {
2685                 status = rte_pipeline_table_entry_add_bulk(p->p,
2686                         table_id,
2687                         match_ll_ptr,
2688                         action_ll_ptr,
2689                         n_rules,
2690                         found,
2691                         data);
2692                 if (status)
2693                         n_rules = 0;
2694         } else
2695                 for (i = 0; i < n_rules; i++) {
2696                         status = rte_pipeline_table_entry_add(p->p,
2697                                 table_id,
2698                                 match_ll_ptr[i],
2699                                 action_ll_ptr[i],
2700                                 &found[i],
2701                                 &data[i]);
2702                         if (status) {
2703                                 n_rules = i;
2704                                 break;
2705                         }
2706                 }
2707
2708         /* Write response */
2709         rsp->status = 0;
2710         rsp->table_rule_add_bulk.n_rules = n_rules;
2711
2712         /* Free */
2713         free(found);
2714         free(action_ll_ptr);
2715         free(match_ll_ptr);
2716         free(action_ll);
2717         free(match_ll);
2718
2719         return rsp;
2720
2721 fail:
2722         free(found);
2723         free(action_ll_ptr);
2724         free(match_ll_ptr);
2725         free(action_ll);
2726         free(match_ll);
2727
2728         rsp->status = -1;
2729         rsp->table_rule_add_bulk.n_rules = 0;
2730         return rsp;
2731 }
2732
2733 static struct pipeline_msg_rsp *
2734 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2735         struct pipeline_msg_req *req)
2736 {
2737         union table_rule_match_low_level match_ll;
2738         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2739         struct table_rule_match *match = &req->table_rule_delete.match;
2740         uint32_t table_id = req->id;
2741         int key_found, status;
2742
2743         status = match_convert(match, &match_ll, 0);
2744         if (status) {
2745                 rsp->status = -1;
2746                 return rsp;
2747         }
2748
2749         rsp->status = rte_pipeline_table_entry_delete(p->p,
2750                 table_id,
2751                 &match_ll,
2752                 &key_found,
2753                 NULL);
2754
2755         return rsp;
2756 }
2757
2758 static struct pipeline_msg_rsp *
2759 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2760         struct pipeline_msg_req *req)
2761 {
2762         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2763         uint32_t table_id = req->id;
2764
2765         rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2766                 table_id,
2767                 NULL);
2768
2769         return rsp;
2770 }
2771
2772 static struct pipeline_msg_rsp *
2773 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2774         struct pipeline_msg_req *req)
2775 {
2776         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2777         uint32_t table_id = req->id;
2778         void *data = req->table_rule_stats_read.data;
2779         int clear = req->table_rule_stats_read.clear;
2780         struct rte_table_action *a = p->table_data[table_id].a;
2781
2782         rsp->status = rte_table_action_stats_read(a,
2783                 data,
2784                 &rsp->table_rule_stats_read.stats,
2785                 clear);
2786
2787         return rsp;
2788 }
2789
2790 static struct pipeline_msg_rsp *
2791 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2792         struct pipeline_msg_req *req)
2793 {
2794         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2795         uint32_t table_id = req->id;
2796         uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2797         struct rte_table_action_meter_profile *profile =
2798                 &req->table_mtr_profile_add.profile;
2799         struct rte_table_action *a = p->table_data[table_id].a;
2800
2801         rsp->status = rte_table_action_meter_profile_add(a,
2802                 meter_profile_id,
2803                 profile);
2804
2805         return rsp;
2806 }
2807
2808 static struct pipeline_msg_rsp *
2809 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2810         struct pipeline_msg_req *req)
2811 {
2812         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2813         uint32_t table_id = req->id;
2814         uint32_t meter_profile_id =
2815                 req->table_mtr_profile_delete.meter_profile_id;
2816         struct rte_table_action *a = p->table_data[table_id].a;
2817
2818         rsp->status = rte_table_action_meter_profile_delete(a,
2819                 meter_profile_id);
2820
2821         return rsp;
2822 }
2823
2824 static struct pipeline_msg_rsp *
2825 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2826         struct pipeline_msg_req *req)
2827 {
2828         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2829         uint32_t table_id = req->id;
2830         void *data = req->table_rule_mtr_read.data;
2831         uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2832         int clear = req->table_rule_mtr_read.clear;
2833         struct rte_table_action *a = p->table_data[table_id].a;
2834
2835         rsp->status = rte_table_action_meter_read(a,
2836                 data,
2837                 tc_mask,
2838                 &rsp->table_rule_mtr_read.stats,
2839                 clear);
2840
2841         return rsp;
2842 }
2843
2844 static struct pipeline_msg_rsp *
2845 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2846         struct pipeline_msg_req *req)
2847 {
2848         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2849         uint32_t table_id = req->id;
2850         uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2851         struct rte_table_action_dscp_table *dscp_table =
2852                 &req->table_dscp_table_update.dscp_table;
2853         struct rte_table_action *a = p->table_data[table_id].a;
2854
2855         rsp->status = rte_table_action_dscp_table_update(a,
2856                 dscp_mask,
2857                 dscp_table);
2858
2859         return rsp;
2860 }
2861
2862 static struct pipeline_msg_rsp *
2863 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2864         struct pipeline_msg_req *req)
2865 {
2866         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2867         uint32_t table_id = req->id;
2868         void *data = req->table_rule_ttl_read.data;
2869         int clear = req->table_rule_ttl_read.clear;
2870         struct rte_table_action *a = p->table_data[table_id].a;
2871
2872         rsp->status = rte_table_action_ttl_read(a,
2873                 data,
2874                 &rsp->table_rule_ttl_read.stats,
2875                 clear);
2876
2877         return rsp;
2878 }
2879
2880 static void
2881 pipeline_msg_handle(struct pipeline_data *p)
2882 {
2883         for ( ; ; ) {
2884                 struct pipeline_msg_req *req;
2885                 struct pipeline_msg_rsp *rsp;
2886
2887                 req = pipeline_msg_recv(p->msgq_req);
2888                 if (req == NULL)
2889                         break;
2890
2891                 switch (req->type) {
2892                 case PIPELINE_REQ_PORT_IN_STATS_READ:
2893                         rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2894                         break;
2895
2896                 case PIPELINE_REQ_PORT_IN_ENABLE:
2897                         rsp = pipeline_msg_handle_port_in_enable(p, req);
2898                         break;
2899
2900                 case PIPELINE_REQ_PORT_IN_DISABLE:
2901                         rsp = pipeline_msg_handle_port_in_disable(p, req);
2902                         break;
2903
2904                 case PIPELINE_REQ_PORT_OUT_STATS_READ:
2905                         rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2906                         break;
2907
2908                 case PIPELINE_REQ_TABLE_STATS_READ:
2909                         rsp = pipeline_msg_handle_table_stats_read(p, req);
2910                         break;
2911
2912                 case PIPELINE_REQ_TABLE_RULE_ADD:
2913                         rsp = pipeline_msg_handle_table_rule_add(p, req);
2914                         break;
2915
2916                 case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2917                         rsp = pipeline_msg_handle_table_rule_add_default(p,     req);
2918                         break;
2919
2920                 case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2921                         rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2922                         break;
2923
2924                 case PIPELINE_REQ_TABLE_RULE_DELETE:
2925                         rsp = pipeline_msg_handle_table_rule_delete(p, req);
2926                         break;
2927
2928                 case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2929                         rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2930                         break;
2931
2932                 case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2933                         rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2934                         break;
2935
2936                 case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2937                         rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2938                         break;
2939
2940                 case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2941                         rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2942                         break;
2943
2944                 case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2945                         rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2946                         break;
2947
2948                 case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2949                         rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2950                         break;
2951
2952                 case PIPELINE_REQ_TABLE_RULE_TTL_READ:
2953                         rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
2954                         break;
2955
2956                 default:
2957                         rsp = (struct pipeline_msg_rsp *) req;
2958                         rsp->status = -1;
2959                 }
2960
2961                 pipeline_msg_send(p->msgq_rsp, rsp);
2962         }
2963 }
2964
2965 /**
2966  * Data plane threads: main
2967  */
2968 int
2969 thread_main(void *arg __rte_unused)
2970 {
2971         struct thread_data *t;
2972         uint32_t thread_id, i;
2973
2974         thread_id = rte_lcore_id();
2975         t = &thread_data[thread_id];
2976
2977         /* Dispatch loop */
2978         for (i = 0; ; i++) {
2979                 uint32_t j;
2980
2981                 /* Data Plane */
2982                 for (j = 0; j < t->n_pipelines; j++)
2983                         rte_pipeline_run(t->p[j]);
2984
2985                 /* Control Plane */
2986                 if ((i & 0xF) == 0) {
2987                         uint64_t time = rte_get_tsc_cycles();
2988                         uint64_t time_next_min = UINT64_MAX;
2989
2990                         if (time < t->time_next_min)
2991                                 continue;
2992
2993                         /* Pipeline message queues */
2994                         for (j = 0; j < t->n_pipelines; j++) {
2995                                 struct pipeline_data *p =
2996                                         &t->pipeline_data[j];
2997                                 uint64_t time_next = p->time_next;
2998
2999                                 if (time_next <= time) {
3000                                         pipeline_msg_handle(p);
3001                                         rte_pipeline_flush(p->p);
3002                                         time_next = time + p->timer_period;
3003                                         p->time_next = time_next;
3004                                 }
3005
3006                                 if (time_next < time_next_min)
3007                                         time_next_min = time_next;
3008                         }
3009
3010                         /* Thread message queues */
3011                         {
3012                                 uint64_t time_next = t->time_next;
3013
3014                                 if (time_next <= time) {
3015                                         thread_msg_handle(t);
3016                                         time_next = time + t->timer_period;
3017                                         t->time_next = time_next;
3018                                 }
3019
3020                                 if (time_next < time_next_min)
3021                                         time_next_min = time_next;
3022                         }
3023
3024                         t->time_next_min = time_next_min;
3025                 }
3026         }
3027
3028         return 0;
3029 }