examples/pipeline: add new example application
[dpdk.git] / examples / pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "obj.h"
19 #include "thread.h"
20
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32
33 /**
34  * Control thread: data plane thread context
35  */
36 struct thread {
37         struct rte_ring *msgq_req;
38         struct rte_ring *msgq_rsp;
39
40         uint32_t enabled;
41 };
42
43 static struct thread thread[RTE_MAX_LCORE];
44
45 /**
46  * Data plane threads: context
47  */
48 struct pipeline_data {
49         struct rte_swx_pipeline *p;
50         uint64_t timer_period; /* Measured in CPU cycles. */
51         uint64_t time_next;
52 };
53
54 struct thread_data {
55         struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
56         uint32_t n_pipelines;
57
58         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
59         struct rte_ring *msgq_req;
60         struct rte_ring *msgq_rsp;
61         uint64_t timer_period; /* Measured in CPU cycles. */
62         uint64_t time_next;
63         uint64_t time_next_min;
64 } __rte_cache_aligned;
65
66 static struct thread_data thread_data[RTE_MAX_LCORE];
67
68 /**
69  * Control thread: data plane thread init
70  */
71 static void
72 thread_free(void)
73 {
74         uint32_t i;
75
76         for (i = 0; i < RTE_MAX_LCORE; i++) {
77                 struct thread *t = &thread[i];
78
79                 if (!rte_lcore_is_enabled(i))
80                         continue;
81
82                 /* MSGQs */
83                 if (t->msgq_req)
84                         rte_ring_free(t->msgq_req);
85
86                 if (t->msgq_rsp)
87                         rte_ring_free(t->msgq_rsp);
88         }
89 }
90
91 int
92 thread_init(void)
93 {
94         uint32_t i;
95
96         RTE_LCORE_FOREACH_SLAVE(i) {
97                 char name[NAME_MAX];
98                 struct rte_ring *msgq_req, *msgq_rsp;
99                 struct thread *t = &thread[i];
100                 struct thread_data *t_data = &thread_data[i];
101                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
102
103                 /* MSGQs */
104                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
105
106                 msgq_req = rte_ring_create(name,
107                         THREAD_MSGQ_SIZE,
108                         cpu_id,
109                         RING_F_SP_ENQ | RING_F_SC_DEQ);
110
111                 if (msgq_req == NULL) {
112                         thread_free();
113                         return -1;
114                 }
115
116                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
117
118                 msgq_rsp = rte_ring_create(name,
119                         THREAD_MSGQ_SIZE,
120                         cpu_id,
121                         RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123                 if (msgq_rsp == NULL) {
124                         thread_free();
125                         return -1;
126                 }
127
128                 /* Control thread records */
129                 t->msgq_req = msgq_req;
130                 t->msgq_rsp = msgq_rsp;
131                 t->enabled = 1;
132
133                 /* Data plane thread records */
134                 t_data->n_pipelines = 0;
135                 t_data->msgq_req = msgq_req;
136                 t_data->msgq_rsp = msgq_rsp;
137                 t_data->timer_period =
138                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
139                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
140                 t_data->time_next_min = t_data->time_next;
141         }
142
143         return 0;
144 }
145
146 static inline int
147 thread_is_running(uint32_t thread_id)
148 {
149         enum rte_lcore_state_t thread_state;
150
151         thread_state = rte_eal_get_lcore_state(thread_id);
152         return (thread_state == RUNNING) ? 1 : 0;
153 }
154
155 /**
156  * Control thread & data plane threads: message passing
157  */
158 enum thread_req_type {
159         THREAD_REQ_PIPELINE_ENABLE = 0,
160         THREAD_REQ_PIPELINE_DISABLE,
161         THREAD_REQ_MAX
162 };
163
164 struct thread_msg_req {
165         enum thread_req_type type;
166
167         union {
168                 struct {
169                         struct rte_swx_pipeline *p;
170                         uint32_t timer_period_ms;
171                 } pipeline_enable;
172
173                 struct {
174                         struct rte_swx_pipeline *p;
175                 } pipeline_disable;
176         };
177 };
178
179 struct thread_msg_rsp {
180         int status;
181 };
182
183 /**
184  * Control thread
185  */
186 static struct thread_msg_req *
187 thread_msg_alloc(void)
188 {
189         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
190                 sizeof(struct thread_msg_rsp));
191
192         return calloc(1, size);
193 }
194
195 static void
196 thread_msg_free(struct thread_msg_rsp *rsp)
197 {
198         free(rsp);
199 }
200
201 static struct thread_msg_rsp *
202 thread_msg_send_recv(uint32_t thread_id,
203         struct thread_msg_req *req)
204 {
205         struct thread *t = &thread[thread_id];
206         struct rte_ring *msgq_req = t->msgq_req;
207         struct rte_ring *msgq_rsp = t->msgq_rsp;
208         struct thread_msg_rsp *rsp;
209         int status;
210
211         /* send */
212         do {
213                 status = rte_ring_sp_enqueue(msgq_req, req);
214         } while (status == -ENOBUFS);
215
216         /* recv */
217         do {
218                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
219         } while (status != 0);
220
221         return rsp;
222 }
223
224 int
225 thread_pipeline_enable(uint32_t thread_id,
226         struct obj *obj,
227         const char *pipeline_name)
228 {
229         struct pipeline *p = pipeline_find(obj, pipeline_name);
230         struct thread *t;
231         struct thread_msg_req *req;
232         struct thread_msg_rsp *rsp;
233         int status;
234
235         /* Check input params */
236         if ((thread_id >= RTE_MAX_LCORE) ||
237                 (p == NULL))
238                 return -1;
239
240         t = &thread[thread_id];
241         if (t->enabled == 0)
242                 return -1;
243
244         if (!thread_is_running(thread_id)) {
245                 struct thread_data *td = &thread_data[thread_id];
246                 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
247
248                 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
249                         return -1;
250
251                 /* Data plane thread */
252                 td->p[td->n_pipelines] = p->p;
253
254                 tdp->p = p->p;
255                 tdp->timer_period =
256                         (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
257                 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
258
259                 td->n_pipelines++;
260
261                 /* Pipeline */
262                 p->thread_id = thread_id;
263                 p->enabled = 1;
264
265                 return 0;
266         }
267
268         /* Allocate request */
269         req = thread_msg_alloc();
270         if (req == NULL)
271                 return -1;
272
273         /* Write request */
274         req->type = THREAD_REQ_PIPELINE_ENABLE;
275         req->pipeline_enable.p = p->p;
276         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
277
278         /* Send request and wait for response */
279         rsp = thread_msg_send_recv(thread_id, req);
280
281         /* Read response */
282         status = rsp->status;
283
284         /* Free response */
285         thread_msg_free(rsp);
286
287         /* Request completion */
288         if (status)
289                 return status;
290
291         p->thread_id = thread_id;
292         p->enabled = 1;
293
294         return 0;
295 }
296
297 int
298 thread_pipeline_disable(uint32_t thread_id,
299         struct obj *obj,
300         const char *pipeline_name)
301 {
302         struct pipeline *p = pipeline_find(obj, pipeline_name);
303         struct thread *t;
304         struct thread_msg_req *req;
305         struct thread_msg_rsp *rsp;
306         int status;
307
308         /* Check input params */
309         if ((thread_id >= RTE_MAX_LCORE) ||
310                 (p == NULL))
311                 return -1;
312
313         t = &thread[thread_id];
314         if (t->enabled == 0)
315                 return -1;
316
317         if (p->enabled == 0)
318                 return 0;
319
320         if (p->thread_id != thread_id)
321                 return -1;
322
323         if (!thread_is_running(thread_id)) {
324                 struct thread_data *td = &thread_data[thread_id];
325                 uint32_t i;
326
327                 for (i = 0; i < td->n_pipelines; i++) {
328                         struct pipeline_data *tdp = &td->pipeline_data[i];
329
330                         if (tdp->p != p->p)
331                                 continue;
332
333                         /* Data plane thread */
334                         if (i < td->n_pipelines - 1) {
335                                 struct rte_swx_pipeline *pipeline_last =
336                                         td->p[td->n_pipelines - 1];
337                                 struct pipeline_data *tdp_last =
338                                         &td->pipeline_data[td->n_pipelines - 1];
339
340                                 td->p[i] = pipeline_last;
341                                 memcpy(tdp, tdp_last, sizeof(*tdp));
342                         }
343
344                         td->n_pipelines--;
345
346                         /* Pipeline */
347                         p->enabled = 0;
348
349                         break;
350                 }
351
352                 return 0;
353         }
354
355         /* Allocate request */
356         req = thread_msg_alloc();
357         if (req == NULL)
358                 return -1;
359
360         /* Write request */
361         req->type = THREAD_REQ_PIPELINE_DISABLE;
362         req->pipeline_disable.p = p->p;
363
364         /* Send request and wait for response */
365         rsp = thread_msg_send_recv(thread_id, req);
366
367         /* Read response */
368         status = rsp->status;
369
370         /* Free response */
371         thread_msg_free(rsp);
372
373         /* Request completion */
374         if (status)
375                 return status;
376
377         p->enabled = 0;
378
379         return 0;
380 }
381
382 /**
383  * Data plane threads: message handling
384  */
385 static inline struct thread_msg_req *
386 thread_msg_recv(struct rte_ring *msgq_req)
387 {
388         struct thread_msg_req *req;
389
390         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
391
392         if (status != 0)
393                 return NULL;
394
395         return req;
396 }
397
398 static inline void
399 thread_msg_send(struct rte_ring *msgq_rsp,
400         struct thread_msg_rsp *rsp)
401 {
402         int status;
403
404         do {
405                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
406         } while (status == -ENOBUFS);
407 }
408
409 static struct thread_msg_rsp *
410 thread_msg_handle_pipeline_enable(struct thread_data *t,
411         struct thread_msg_req *req)
412 {
413         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
414         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
415
416         /* Request */
417         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
418                 rsp->status = -1;
419                 return rsp;
420         }
421
422         t->p[t->n_pipelines] = req->pipeline_enable.p;
423
424         p->p = req->pipeline_enable.p;
425         p->timer_period = (rte_get_tsc_hz() *
426                 req->pipeline_enable.timer_period_ms) / 1000;
427         p->time_next = rte_get_tsc_cycles() + p->timer_period;
428
429         t->n_pipelines++;
430
431         /* Response */
432         rsp->status = 0;
433         return rsp;
434 }
435
436 static struct thread_msg_rsp *
437 thread_msg_handle_pipeline_disable(struct thread_data *t,
438         struct thread_msg_req *req)
439 {
440         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
441         uint32_t n_pipelines = t->n_pipelines;
442         struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
443         uint32_t i;
444
445         /* find pipeline */
446         for (i = 0; i < n_pipelines; i++) {
447                 struct pipeline_data *p = &t->pipeline_data[i];
448
449                 if (p->p != pipeline)
450                         continue;
451
452                 if (i < n_pipelines - 1) {
453                         struct rte_swx_pipeline *pipeline_last =
454                                 t->p[n_pipelines - 1];
455                         struct pipeline_data *p_last =
456                                 &t->pipeline_data[n_pipelines - 1];
457
458                         t->p[i] = pipeline_last;
459                         memcpy(p, p_last, sizeof(*p));
460                 }
461
462                 t->n_pipelines--;
463
464                 rsp->status = 0;
465                 return rsp;
466         }
467
468         /* should not get here */
469         rsp->status = 0;
470         return rsp;
471 }
472
473 static void
474 thread_msg_handle(struct thread_data *t)
475 {
476         for ( ; ; ) {
477                 struct thread_msg_req *req;
478                 struct thread_msg_rsp *rsp;
479
480                 req = thread_msg_recv(t->msgq_req);
481                 if (req == NULL)
482                         break;
483
484                 switch (req->type) {
485                 case THREAD_REQ_PIPELINE_ENABLE:
486                         rsp = thread_msg_handle_pipeline_enable(t, req);
487                         break;
488
489                 case THREAD_REQ_PIPELINE_DISABLE:
490                         rsp = thread_msg_handle_pipeline_disable(t, req);
491                         break;
492
493                 default:
494                         rsp = (struct thread_msg_rsp *) req;
495                         rsp->status = -1;
496                 }
497
498                 thread_msg_send(t->msgq_rsp, rsp);
499         }
500 }
501
502 /**
503  * Data plane threads: main
504  */
505 int
506 thread_main(void *arg __rte_unused)
507 {
508         struct thread_data *t;
509         uint32_t thread_id, i;
510
511         thread_id = rte_lcore_id();
512         t = &thread_data[thread_id];
513
514         /* Dispatch loop */
515         for (i = 0; ; i++) {
516                 uint32_t j;
517
518                 /* Data Plane */
519                 for (j = 0; j < t->n_pipelines; j++)
520                         rte_swx_pipeline_run(t->p[j], 1000000);
521
522                 /* Control Plane */
523                 if ((i & 0xF) == 0) {
524                         uint64_t time = rte_get_tsc_cycles();
525                         uint64_t time_next_min = UINT64_MAX;
526
527                         if (time < t->time_next_min)
528                                 continue;
529
530                         /* Thread message queues */
531                         {
532                                 uint64_t time_next = t->time_next;
533
534                                 if (time_next <= time) {
535                                         thread_msg_handle(t);
536                                         time_next = time + t->timer_period;
537                                         t->time_next = time_next;
538                                 }
539
540                                 if (time_next < time_next_min)
541                                         time_next_min = time_next;
542                         }
543
544                         t->time_next_min = time_next_min;
545                 }
546         }
547
548         return 0;
549 }