examples/ip_pipeline: add stats read commands
[dpdk.git] / examples / ip_pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38         struct rte_ring *msgq_req;
39         struct rte_ring *msgq_rsp;
40
41         uint32_t enabled;
42 };
43
44 static struct thread thread[RTE_MAX_LCORE];
45
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50         struct rte_table_action *a;
51 };
52
53 struct pipeline_data {
54         struct rte_pipeline *p;
55         struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56         uint32_t n_tables;
57
58         struct rte_ring *msgq_req;
59         struct rte_ring *msgq_rsp;
60         uint64_t timer_period; /* Measured in CPU cycles. */
61         uint64_t time_next;
62
63         uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65
66 struct thread_data {
67         struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68         uint32_t n_pipelines;
69
70         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71         struct rte_ring *msgq_req;
72         struct rte_ring *msgq_rsp;
73         uint64_t timer_period; /* Measured in CPU cycles. */
74         uint64_t time_next;
75         uint64_t time_next_min;
76 } __rte_cache_aligned;
77
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86         uint32_t i;
87
88         for (i = 0; i < RTE_MAX_LCORE; i++) {
89                 struct thread *t = &thread[i];
90
91                 if (!rte_lcore_is_enabled(i))
92                         continue;
93
94                 /* MSGQs */
95                 if (t->msgq_req)
96                         rte_ring_free(t->msgq_req);
97
98                 if (t->msgq_rsp)
99                         rte_ring_free(t->msgq_rsp);
100         }
101 }
102
103 int
104 thread_init(void)
105 {
106         uint32_t i;
107
108         RTE_LCORE_FOREACH_SLAVE(i) {
109                 char name[NAME_MAX];
110                 struct rte_ring *msgq_req, *msgq_rsp;
111                 struct thread *t = &thread[i];
112                 struct thread_data *t_data = &thread_data[i];
113                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
114
115                 /* MSGQs */
116                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117
118                 msgq_req = rte_ring_create(name,
119                         THREAD_MSGQ_SIZE,
120                         cpu_id,
121                         RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123                 if (msgq_req == NULL) {
124                         thread_free();
125                         return -1;
126                 }
127
128                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129
130                 msgq_rsp = rte_ring_create(name,
131                         THREAD_MSGQ_SIZE,
132                         cpu_id,
133                         RING_F_SP_ENQ | RING_F_SC_DEQ);
134
135                 if (msgq_rsp == NULL) {
136                         thread_free();
137                         return -1;
138                 }
139
140                 /* Master thread records */
141                 t->msgq_req = msgq_req;
142                 t->msgq_rsp = msgq_rsp;
143                 t->enabled = 1;
144
145                 /* Data plane thread records */
146                 t_data->n_pipelines = 0;
147                 t_data->msgq_req = msgq_req;
148                 t_data->msgq_rsp = msgq_rsp;
149                 t_data->timer_period =
150                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152                 t_data->time_next_min = t_data->time_next;
153         }
154
155         return 0;
156 }
157
158 /**
159  * Master thread & data plane threads: message passing
160  */
161 enum thread_req_type {
162         THREAD_REQ_PIPELINE_ENABLE = 0,
163         THREAD_REQ_PIPELINE_DISABLE,
164         THREAD_REQ_MAX
165 };
166
167 struct thread_msg_req {
168         enum thread_req_type type;
169
170         union {
171                 struct {
172                         struct rte_pipeline *p;
173                         struct {
174                                 struct rte_table_action *a;
175                         } table[RTE_PIPELINE_TABLE_MAX];
176                         struct rte_ring *msgq_req;
177                         struct rte_ring *msgq_rsp;
178                         uint32_t timer_period_ms;
179                         uint32_t n_tables;
180                 } pipeline_enable;
181
182                 struct {
183                         struct rte_pipeline *p;
184                 } pipeline_disable;
185         };
186 };
187
188 struct thread_msg_rsp {
189         int status;
190 };
191
192 /**
193  * Master thread
194  */
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
197 {
198         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199                 sizeof(struct thread_msg_rsp));
200
201         return calloc(1, size);
202 }
203
204 static void
205 thread_msg_free(struct thread_msg_rsp *rsp)
206 {
207         free(rsp);
208 }
209
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212         struct thread_msg_req *req)
213 {
214         struct thread *t = &thread[thread_id];
215         struct rte_ring *msgq_req = t->msgq_req;
216         struct rte_ring *msgq_rsp = t->msgq_rsp;
217         struct thread_msg_rsp *rsp;
218         int status;
219
220         /* send */
221         do {
222                 status = rte_ring_sp_enqueue(msgq_req, req);
223         } while (status == -ENOBUFS);
224
225         /* recv */
226         do {
227                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228         } while (status != 0);
229
230         return rsp;
231 }
232
233 int
234 thread_pipeline_enable(uint32_t thread_id,
235         const char *pipeline_name)
236 {
237         struct pipeline *p = pipeline_find(pipeline_name);
238         struct thread *t;
239         struct thread_msg_req *req;
240         struct thread_msg_rsp *rsp;
241         uint32_t i;
242         int status;
243
244         /* Check input params */
245         if ((thread_id >= RTE_MAX_LCORE) ||
246                 (p == NULL) ||
247                 (p->n_ports_in == 0) ||
248                 (p->n_ports_out == 0) ||
249                 (p->n_tables == 0))
250                 return -1;
251
252         t = &thread[thread_id];
253         if ((t->enabled == 0) ||
254                 p->enabled)
255                 return -1;
256
257         /* Allocate request */
258         req = thread_msg_alloc();
259         if (req == NULL)
260                 return -1;
261
262         /* Write request */
263         req->type = THREAD_REQ_PIPELINE_ENABLE;
264         req->pipeline_enable.p = p->p;
265         for (i = 0; i < p->n_tables; i++)
266                 req->pipeline_enable.table[i].a =
267                         p->table[i].a;
268         req->pipeline_enable.msgq_req = p->msgq_req;
269         req->pipeline_enable.msgq_rsp = p->msgq_rsp;
270         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
271         req->pipeline_enable.n_tables = p->n_tables;
272
273         /* Send request and wait for response */
274         rsp = thread_msg_send_recv(thread_id, req);
275         if (rsp == NULL)
276                 return -1;
277
278         /* Read response */
279         status = rsp->status;
280
281         /* Free response */
282         thread_msg_free(rsp);
283
284         /* Request completion */
285         if (status)
286                 return status;
287
288         p->thread_id = thread_id;
289         p->enabled = 1;
290
291         return 0;
292 }
293
294 int
295 thread_pipeline_disable(uint32_t thread_id,
296         const char *pipeline_name)
297 {
298         struct pipeline *p = pipeline_find(pipeline_name);
299         struct thread *t;
300         struct thread_msg_req *req;
301         struct thread_msg_rsp *rsp;
302         int status;
303
304         /* Check input params */
305         if ((thread_id >= RTE_MAX_LCORE) ||
306                 (p == NULL))
307                 return -1;
308
309         t = &thread[thread_id];
310         if (t->enabled == 0)
311                 return -1;
312
313         if (p->enabled == 0)
314                 return 0;
315
316         if (p->thread_id != thread_id)
317                 return -1;
318
319         /* Allocate request */
320         req = thread_msg_alloc();
321         if (req == NULL)
322                 return -1;
323
324         /* Write request */
325         req->type = THREAD_REQ_PIPELINE_DISABLE;
326         req->pipeline_disable.p = p->p;
327
328         /* Send request and wait for response */
329         rsp = thread_msg_send_recv(thread_id, req);
330         if (rsp == NULL)
331                 return -1;
332
333         /* Read response */
334         status = rsp->status;
335
336         /* Free response */
337         thread_msg_free(rsp);
338
339         /* Request completion */
340         if (status)
341                 return status;
342
343         p->enabled = 0;
344
345         return 0;
346 }
347
348 /**
349  * Data plane threads: message handling
350  */
351 static inline struct thread_msg_req *
352 thread_msg_recv(struct rte_ring *msgq_req)
353 {
354         struct thread_msg_req *req;
355
356         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
357
358         if (status != 0)
359                 return NULL;
360
361         return req;
362 }
363
364 static inline void
365 thread_msg_send(struct rte_ring *msgq_rsp,
366         struct thread_msg_rsp *rsp)
367 {
368         int status;
369
370         do {
371                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
372         } while (status == -ENOBUFS);
373 }
374
375 static struct thread_msg_rsp *
376 thread_msg_handle_pipeline_enable(struct thread_data *t,
377         struct thread_msg_req *req)
378 {
379         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
380         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
381         uint32_t i;
382
383         /* Request */
384         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
385                 rsp->status = -1;
386                 return rsp;
387         }
388
389         t->p[t->n_pipelines] = req->pipeline_enable.p;
390
391         p->p = req->pipeline_enable.p;
392         for (i = 0; i < req->pipeline_enable.n_tables; i++)
393                 p->table_data[i].a =
394                         req->pipeline_enable.table[i].a;
395
396         p->n_tables = req->pipeline_enable.n_tables;
397
398         p->msgq_req = req->pipeline_enable.msgq_req;
399         p->msgq_rsp = req->pipeline_enable.msgq_rsp;
400         p->timer_period =
401                 (rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
402         p->time_next = rte_get_tsc_cycles() + p->timer_period;
403
404         t->n_pipelines++;
405
406         /* Response */
407         rsp->status = 0;
408         return rsp;
409 }
410
411 static struct thread_msg_rsp *
412 thread_msg_handle_pipeline_disable(struct thread_data *t,
413         struct thread_msg_req *req)
414 {
415         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
416         uint32_t n_pipelines = t->n_pipelines;
417         struct rte_pipeline *pipeline = req->pipeline_disable.p;
418         uint32_t i;
419
420         /* find pipeline */
421         for (i = 0; i < n_pipelines; i++) {
422                 struct pipeline_data *p = &t->pipeline_data[i];
423
424                 if (p->p != pipeline)
425                         continue;
426
427                 if (i < n_pipelines - 1) {
428                         struct rte_pipeline *pipeline_last =
429                                 t->p[n_pipelines - 1];
430                         struct pipeline_data *p_last =
431                                 &t->pipeline_data[n_pipelines - 1];
432
433                         t->p[i] = pipeline_last;
434                         memcpy(p, p_last, sizeof(*p));
435                 }
436
437                 t->n_pipelines--;
438
439                 rsp->status = 0;
440                 return rsp;
441         }
442
443         /* should not get here */
444         rsp->status = 0;
445         return rsp;
446 }
447
448 static void
449 thread_msg_handle(struct thread_data *t)
450 {
451         for ( ; ; ) {
452                 struct thread_msg_req *req;
453                 struct thread_msg_rsp *rsp;
454
455                 req = thread_msg_recv(t->msgq_req);
456                 if (req == NULL)
457                         break;
458
459                 switch (req->type) {
460                 case THREAD_REQ_PIPELINE_ENABLE:
461                         rsp = thread_msg_handle_pipeline_enable(t, req);
462                         break;
463
464                 case THREAD_REQ_PIPELINE_DISABLE:
465                         rsp = thread_msg_handle_pipeline_disable(t, req);
466                         break;
467
468                 default:
469                         rsp = (struct thread_msg_rsp *) req;
470                         rsp->status = -1;
471                 }
472
473                 thread_msg_send(t->msgq_rsp, rsp);
474         }
475 }
476
477 /**
478  * Master thread & data plane threads: message passing
479  */
480 enum pipeline_req_type {
481         /* Port IN */
482         PIPELINE_REQ_PORT_IN_STATS_READ,
483         PIPELINE_REQ_PORT_IN_ENABLE,
484         PIPELINE_REQ_PORT_IN_DISABLE,
485
486         /* Port OUT */
487         PIPELINE_REQ_PORT_OUT_STATS_READ,
488
489         /* Table */
490         PIPELINE_REQ_TABLE_STATS_READ,
491
492         PIPELINE_REQ_MAX
493 };
494
495 struct pipeline_msg_req_port_in_stats_read {
496         int clear;
497 };
498
499 struct pipeline_msg_req_port_out_stats_read {
500         int clear;
501 };
502
503 struct pipeline_msg_req_table_stats_read {
504         int clear;
505 };
506
507 struct pipeline_msg_req {
508         enum pipeline_req_type type;
509         uint32_t id; /* Port IN, port OUT or table ID */
510
511         RTE_STD_C11
512         union {
513                 struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
514                 struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
515                 struct pipeline_msg_req_table_stats_read table_stats_read;
516         };
517 };
518
519 struct pipeline_msg_rsp_port_in_stats_read {
520         struct rte_pipeline_port_in_stats stats;
521 };
522
523 struct pipeline_msg_rsp_port_out_stats_read {
524         struct rte_pipeline_port_out_stats stats;
525 };
526
527 struct pipeline_msg_rsp_table_stats_read {
528         struct rte_pipeline_table_stats stats;
529 };
530
531 struct pipeline_msg_rsp {
532         int status;
533
534         RTE_STD_C11
535         union {
536                 struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
537                 struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
538                 struct pipeline_msg_rsp_table_stats_read table_stats_read;
539         };
540 };
541
542 /**
543  * Master thread
544  */
545 static struct pipeline_msg_req *
546 pipeline_msg_alloc(void)
547 {
548         size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
549                 sizeof(struct pipeline_msg_rsp));
550
551         return calloc(1, size);
552 }
553
554 static void
555 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
556 {
557         free(rsp);
558 }
559
560 static struct pipeline_msg_rsp *
561 pipeline_msg_send_recv(struct pipeline *p,
562         struct pipeline_msg_req *req)
563 {
564         struct rte_ring *msgq_req = p->msgq_req;
565         struct rte_ring *msgq_rsp = p->msgq_rsp;
566         struct pipeline_msg_rsp *rsp;
567         int status;
568
569         /* send */
570         do {
571                 status = rte_ring_sp_enqueue(msgq_req, req);
572         } while (status == -ENOBUFS);
573
574         /* recv */
575         do {
576                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
577         } while (status != 0);
578
579         return rsp;
580 }
581
582 int
583 pipeline_port_in_stats_read(const char *pipeline_name,
584         uint32_t port_id,
585         struct rte_pipeline_port_in_stats *stats,
586         int clear)
587 {
588         struct pipeline *p;
589         struct pipeline_msg_req *req;
590         struct pipeline_msg_rsp *rsp;
591         int status;
592
593         /* Check input params */
594         if ((pipeline_name == NULL) ||
595                 (stats == NULL))
596                 return -1;
597
598         p = pipeline_find(pipeline_name);
599         if ((p == NULL) ||
600                 (p->enabled == 0) ||
601                 (port_id >= p->n_ports_in))
602                 return -1;
603
604         /* Allocate request */
605         req = pipeline_msg_alloc();
606         if (req == NULL)
607                 return -1;
608
609         /* Write request */
610         req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
611         req->id = port_id;
612         req->port_in_stats_read.clear = clear;
613
614         /* Send request and wait for response */
615         rsp = pipeline_msg_send_recv(p, req);
616         if (rsp == NULL)
617                 return -1;
618
619         /* Read response */
620         status = rsp->status;
621         if (status)
622                 memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
623
624         /* Free response */
625         pipeline_msg_free(rsp);
626
627         return status;
628 }
629
630 int
631 pipeline_port_in_enable(const char *pipeline_name,
632         uint32_t port_id)
633 {
634         struct pipeline *p;
635         struct pipeline_msg_req *req;
636         struct pipeline_msg_rsp *rsp;
637         int status;
638
639         /* Check input params */
640         if (pipeline_name == NULL)
641                 return -1;
642
643         p = pipeline_find(pipeline_name);
644         if ((p == NULL) ||
645                 (p->enabled == 0) ||
646                 (port_id >= p->n_ports_in))
647                 return -1;
648
649         /* Allocate request */
650         req = pipeline_msg_alloc();
651         if (req == NULL)
652                 return -1;
653
654         /* Write request */
655         req->type = PIPELINE_REQ_PORT_IN_ENABLE;
656         req->id = port_id;
657
658         /* Send request and wait for response */
659         rsp = pipeline_msg_send_recv(p, req);
660         if (rsp == NULL)
661                 return -1;
662
663         /* Read response */
664         status = rsp->status;
665
666         /* Free response */
667         pipeline_msg_free(rsp);
668
669         return status;
670 }
671
672 int
673 pipeline_port_in_disable(const char *pipeline_name,
674         uint32_t port_id)
675 {
676         struct pipeline *p;
677         struct pipeline_msg_req *req;
678         struct pipeline_msg_rsp *rsp;
679         int status;
680
681         /* Check input params */
682         if (pipeline_name == NULL)
683                 return -1;
684
685         p = pipeline_find(pipeline_name);
686         if ((p == NULL) ||
687                 (p->enabled == 0) ||
688                 (port_id >= p->n_ports_in))
689                 return -1;
690
691         /* Allocate request */
692         req = pipeline_msg_alloc();
693         if (req == NULL)
694                 return -1;
695
696         /* Write request */
697         req->type = PIPELINE_REQ_PORT_IN_DISABLE;
698         req->id = port_id;
699
700         /* Send request and wait for response */
701         rsp = pipeline_msg_send_recv(p, req);
702         if (rsp == NULL)
703                 return -1;
704
705         /* Read response */
706         status = rsp->status;
707
708         /* Free response */
709         pipeline_msg_free(rsp);
710
711         return status;
712 }
713
714 int
715 pipeline_port_out_stats_read(const char *pipeline_name,
716         uint32_t port_id,
717         struct rte_pipeline_port_out_stats *stats,
718         int clear)
719 {
720         struct pipeline *p;
721         struct pipeline_msg_req *req;
722         struct pipeline_msg_rsp *rsp;
723         int status;
724
725         /* Check input params */
726         if ((pipeline_name == NULL) ||
727                 (stats == NULL))
728                 return -1;
729
730         p = pipeline_find(pipeline_name);
731         if ((p == NULL) ||
732                 (p->enabled == 0) ||
733                 (port_id >= p->n_ports_out))
734                 return -1;
735
736         /* Allocate request */
737         req = pipeline_msg_alloc();
738         if (req == NULL)
739                 return -1;
740
741         /* Write request */
742         req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
743         req->id = port_id;
744         req->port_out_stats_read.clear = clear;
745
746         /* Send request and wait for response */
747         rsp = pipeline_msg_send_recv(p, req);
748         if (rsp == NULL)
749                 return -1;
750
751         /* Read response */
752         status = rsp->status;
753         if (status)
754                 memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
755
756         /* Free response */
757         pipeline_msg_free(rsp);
758
759         return status;
760 }
761
762 int
763 pipeline_table_stats_read(const char *pipeline_name,
764         uint32_t table_id,
765         struct rte_pipeline_table_stats *stats,
766         int clear)
767 {
768         struct pipeline *p;
769         struct pipeline_msg_req *req;
770         struct pipeline_msg_rsp *rsp;
771         int status;
772
773         /* Check input params */
774         if ((pipeline_name == NULL) ||
775                 (stats == NULL))
776                 return -1;
777
778         p = pipeline_find(pipeline_name);
779         if ((p == NULL) ||
780                 (p->enabled == 0) ||
781                 (table_id >= p->n_tables))
782                 return -1;
783
784         /* Allocate request */
785         req = pipeline_msg_alloc();
786         if (req == NULL)
787                 return -1;
788
789         /* Write request */
790         req->type = PIPELINE_REQ_TABLE_STATS_READ;
791         req->id = table_id;
792         req->table_stats_read.clear = clear;
793
794         /* Send request and wait for response */
795         rsp = pipeline_msg_send_recv(p, req);
796         if (rsp == NULL)
797                 return -1;
798
799         /* Read response */
800         status = rsp->status;
801         if (status)
802                 memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
803
804         /* Free response */
805         pipeline_msg_free(rsp);
806
807         return status;
808 }
809
810 /**
811  * Data plane threads: message handling
812  */
813 static inline struct pipeline_msg_req *
814 pipeline_msg_recv(struct rte_ring *msgq_req)
815 {
816         struct pipeline_msg_req *req;
817
818         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
819
820         if (status != 0)
821                 return NULL;
822
823         return req;
824 }
825
826 static inline void
827 pipeline_msg_send(struct rte_ring *msgq_rsp,
828         struct pipeline_msg_rsp *rsp)
829 {
830         int status;
831
832         do {
833                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
834         } while (status == -ENOBUFS);
835 }
836
837 static struct pipeline_msg_rsp *
838 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
839         struct pipeline_msg_req *req)
840 {
841         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
842         uint32_t port_id = req->id;
843         int clear = req->port_in_stats_read.clear;
844
845         rsp->status = rte_pipeline_port_in_stats_read(p->p,
846                 port_id,
847                 &rsp->port_in_stats_read.stats,
848                 clear);
849
850         return rsp;
851 }
852
853 static struct pipeline_msg_rsp *
854 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
855         struct pipeline_msg_req *req)
856 {
857         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
858         uint32_t port_id = req->id;
859
860         rsp->status = rte_pipeline_port_in_enable(p->p,
861                 port_id);
862
863         return rsp;
864 }
865
866 static struct pipeline_msg_rsp *
867 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
868         struct pipeline_msg_req *req)
869 {
870         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
871         uint32_t port_id = req->id;
872
873         rsp->status = rte_pipeline_port_in_disable(p->p,
874                 port_id);
875
876         return rsp;
877 }
878
879 static struct pipeline_msg_rsp *
880 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
881         struct pipeline_msg_req *req)
882 {
883         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
884         uint32_t port_id = req->id;
885         int clear = req->port_out_stats_read.clear;
886
887         rsp->status = rte_pipeline_port_out_stats_read(p->p,
888                 port_id,
889                 &rsp->port_out_stats_read.stats,
890                 clear);
891
892         return rsp;
893 }
894
895 static struct pipeline_msg_rsp *
896 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
897         struct pipeline_msg_req *req)
898 {
899         struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
900         uint32_t port_id = req->id;
901         int clear = req->table_stats_read.clear;
902
903         rsp->status = rte_pipeline_table_stats_read(p->p,
904                 port_id,
905                 &rsp->table_stats_read.stats,
906                 clear);
907
908         return rsp;
909 }
910
911 static void
912 pipeline_msg_handle(struct pipeline_data *p)
913 {
914         for ( ; ; ) {
915                 struct pipeline_msg_req *req;
916                 struct pipeline_msg_rsp *rsp;
917
918                 req = pipeline_msg_recv(p->msgq_req);
919                 if (req == NULL)
920                         break;
921
922                 switch (req->type) {
923                 case PIPELINE_REQ_PORT_IN_STATS_READ:
924                         rsp = pipeline_msg_handle_port_in_stats_read(p, req);
925                         break;
926
927                 case PIPELINE_REQ_PORT_IN_ENABLE:
928                         rsp = pipeline_msg_handle_port_in_enable(p, req);
929                         break;
930
931                 case PIPELINE_REQ_PORT_IN_DISABLE:
932                         rsp = pipeline_msg_handle_port_in_disable(p, req);
933                         break;
934
935                 case PIPELINE_REQ_PORT_OUT_STATS_READ:
936                         rsp = pipeline_msg_handle_port_out_stats_read(p, req);
937                         break;
938
939                 case PIPELINE_REQ_TABLE_STATS_READ:
940                         rsp = pipeline_msg_handle_table_stats_read(p, req);
941                         break;
942
943
944                 default:
945                         rsp = (struct pipeline_msg_rsp *) req;
946                         rsp->status = -1;
947                 }
948
949                 pipeline_msg_send(p->msgq_rsp, rsp);
950         }
951 }
952
953 /**
954  * Data plane threads: main
955  */
956 int
957 thread_main(void *arg __rte_unused)
958 {
959         struct thread_data *t;
960         uint32_t thread_id, i;
961
962         thread_id = rte_lcore_id();
963         t = &thread_data[thread_id];
964
965         /* Dispatch loop */
966         for (i = 0; ; i++) {
967                 uint32_t j;
968
969                 /* Data Plane */
970                 for (j = 0; j < t->n_pipelines; j++)
971                         rte_pipeline_run(t->p[j]);
972
973                 /* Control Plane */
974                 if ((i & 0xF) == 0) {
975                         uint64_t time = rte_get_tsc_cycles();
976                         uint64_t time_next_min = UINT64_MAX;
977
978                         if (time < t->time_next_min)
979                                 continue;
980
981                         /* Pipeline message queues */
982                         for (j = 0; j < t->n_pipelines; j++) {
983                                 struct pipeline_data *p =
984                                         &t->pipeline_data[j];
985                                 uint64_t time_next = p->time_next;
986
987                                 if (time_next <= time) {
988                                         pipeline_msg_handle(p);
989                                         rte_pipeline_flush(p->p);
990                                         time_next = time + p->timer_period;
991                                         p->time_next = time_next;
992                                 }
993
994                                 if (time_next < time_next_min)
995                                         time_next_min = time_next;
996                         }
997
998                         /* Thread message queues */
999                         {
1000                                 uint64_t time_next = t->time_next;
1001
1002                                 if (time_next <= time) {
1003                                         thread_msg_handle(t);
1004                                         time_next = time + t->timer_period;
1005                                         t->time_next = time_next;
1006                                 }
1007
1008                                 if (time_next < time_next_min)
1009                                         time_next_min = time_next;
1010                         }
1011
1012                         t->time_next_min = time_next_min;
1013                 }
1014         }
1015
1016         return 0;
1017 }