event/cnxk: fix clang build on Arm
[dpdk.git] / examples / pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "obj.h"
19 #include "thread.h"
20
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34  * work, but not too big to avoid starving any other pipelines mapped to the
35  * same thread. For a pipeline that executes 10 instructions per packet, a
36  * quanta of 1000 instructions equates to processing 100 packets.
37  */
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA                              1000
40 #endif
41
42 /**
43  * Control thread: data plane thread context
44  */
45 struct thread {
46         struct rte_ring *msgq_req;
47         struct rte_ring *msgq_rsp;
48
49         uint32_t enabled;
50 };
51
52 static struct thread thread[RTE_MAX_LCORE];
53
54 /**
55  * Data plane threads: context
56  */
57 struct pipeline_data {
58         struct rte_swx_pipeline *p;
59         uint64_t timer_period; /* Measured in CPU cycles. */
60         uint64_t time_next;
61 };
62
63 struct thread_data {
64         struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65         uint32_t n_pipelines;
66
67         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68         struct rte_ring *msgq_req;
69         struct rte_ring *msgq_rsp;
70         uint64_t timer_period; /* Measured in CPU cycles. */
71         uint64_t time_next;
72         uint64_t time_next_min;
73 } __rte_cache_aligned;
74
75 static struct thread_data thread_data[RTE_MAX_LCORE];
76
77 /**
78  * Control thread: data plane thread init
79  */
80 static void
81 thread_free(void)
82 {
83         uint32_t i;
84
85         for (i = 0; i < RTE_MAX_LCORE; i++) {
86                 struct thread *t = &thread[i];
87
88                 if (!rte_lcore_is_enabled(i))
89                         continue;
90
91                 /* MSGQs */
92                 if (t->msgq_req)
93                         rte_ring_free(t->msgq_req);
94
95                 if (t->msgq_rsp)
96                         rte_ring_free(t->msgq_rsp);
97         }
98 }
99
100 int
101 thread_init(void)
102 {
103         uint32_t i;
104
105         RTE_LCORE_FOREACH_WORKER(i) {
106                 char name[NAME_MAX];
107                 struct rte_ring *msgq_req, *msgq_rsp;
108                 struct thread *t = &thread[i];
109                 struct thread_data *t_data = &thread_data[i];
110                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
111
112                 /* MSGQs */
113                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
114
115                 msgq_req = rte_ring_create(name,
116                         THREAD_MSGQ_SIZE,
117                         cpu_id,
118                         RING_F_SP_ENQ | RING_F_SC_DEQ);
119
120                 if (msgq_req == NULL) {
121                         thread_free();
122                         return -1;
123                 }
124
125                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
126
127                 msgq_rsp = rte_ring_create(name,
128                         THREAD_MSGQ_SIZE,
129                         cpu_id,
130                         RING_F_SP_ENQ | RING_F_SC_DEQ);
131
132                 if (msgq_rsp == NULL) {
133                         thread_free();
134                         return -1;
135                 }
136
137                 /* Control thread records */
138                 t->msgq_req = msgq_req;
139                 t->msgq_rsp = msgq_rsp;
140                 t->enabled = 1;
141
142                 /* Data plane thread records */
143                 t_data->n_pipelines = 0;
144                 t_data->msgq_req = msgq_req;
145                 t_data->msgq_rsp = msgq_rsp;
146                 t_data->timer_period =
147                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
148                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
149                 t_data->time_next_min = t_data->time_next;
150         }
151
152         return 0;
153 }
154
155 static inline int
156 thread_is_running(uint32_t thread_id)
157 {
158         enum rte_lcore_state_t thread_state;
159
160         thread_state = rte_eal_get_lcore_state(thread_id);
161         return (thread_state == RUNNING) ? 1 : 0;
162 }
163
164 /**
165  * Control thread & data plane threads: message passing
166  */
167 enum thread_req_type {
168         THREAD_REQ_PIPELINE_ENABLE = 0,
169         THREAD_REQ_PIPELINE_DISABLE,
170         THREAD_REQ_MAX
171 };
172
173 struct thread_msg_req {
174         enum thread_req_type type;
175
176         union {
177                 struct {
178                         struct rte_swx_pipeline *p;
179                         uint32_t timer_period_ms;
180                 } pipeline_enable;
181
182                 struct {
183                         struct rte_swx_pipeline *p;
184                 } pipeline_disable;
185         };
186 };
187
188 struct thread_msg_rsp {
189         int status;
190 };
191
192 /**
193  * Control thread
194  */
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
197 {
198         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199                 sizeof(struct thread_msg_rsp));
200
201         return calloc(1, size);
202 }
203
204 static void
205 thread_msg_free(struct thread_msg_rsp *rsp)
206 {
207         free(rsp);
208 }
209
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212         struct thread_msg_req *req)
213 {
214         struct thread *t = &thread[thread_id];
215         struct rte_ring *msgq_req = t->msgq_req;
216         struct rte_ring *msgq_rsp = t->msgq_rsp;
217         struct thread_msg_rsp *rsp;
218         int status;
219
220         /* send */
221         do {
222                 status = rte_ring_sp_enqueue(msgq_req, req);
223         } while (status == -ENOBUFS);
224
225         /* recv */
226         do {
227                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228         } while (status != 0);
229
230         return rsp;
231 }
232
233 int
234 thread_pipeline_enable(uint32_t thread_id,
235         struct obj *obj,
236         const char *pipeline_name)
237 {
238         struct pipeline *p = pipeline_find(obj, pipeline_name);
239         struct thread *t;
240         struct thread_msg_req *req;
241         struct thread_msg_rsp *rsp;
242         int status;
243
244         /* Check input params */
245         if ((thread_id >= RTE_MAX_LCORE) ||
246                 (p == NULL))
247                 return -1;
248
249         t = &thread[thread_id];
250         if (t->enabled == 0)
251                 return -1;
252
253         if (!thread_is_running(thread_id)) {
254                 struct thread_data *td = &thread_data[thread_id];
255                 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
256
257                 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
258                         return -1;
259
260                 /* Data plane thread */
261                 td->p[td->n_pipelines] = p->p;
262
263                 tdp->p = p->p;
264                 tdp->timer_period =
265                         (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
266                 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
267
268                 td->n_pipelines++;
269
270                 /* Pipeline */
271                 p->thread_id = thread_id;
272                 p->enabled = 1;
273
274                 return 0;
275         }
276
277         /* Allocate request */
278         req = thread_msg_alloc();
279         if (req == NULL)
280                 return -1;
281
282         /* Write request */
283         req->type = THREAD_REQ_PIPELINE_ENABLE;
284         req->pipeline_enable.p = p->p;
285         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
286
287         /* Send request and wait for response */
288         rsp = thread_msg_send_recv(thread_id, req);
289
290         /* Read response */
291         status = rsp->status;
292
293         /* Free response */
294         thread_msg_free(rsp);
295
296         /* Request completion */
297         if (status)
298                 return status;
299
300         p->thread_id = thread_id;
301         p->enabled = 1;
302
303         return 0;
304 }
305
306 int
307 thread_pipeline_disable(uint32_t thread_id,
308         struct obj *obj,
309         const char *pipeline_name)
310 {
311         struct pipeline *p = pipeline_find(obj, pipeline_name);
312         struct thread *t;
313         struct thread_msg_req *req;
314         struct thread_msg_rsp *rsp;
315         int status;
316
317         /* Check input params */
318         if ((thread_id >= RTE_MAX_LCORE) ||
319                 (p == NULL))
320                 return -1;
321
322         t = &thread[thread_id];
323         if (t->enabled == 0)
324                 return -1;
325
326         if (p->enabled == 0)
327                 return 0;
328
329         if (p->thread_id != thread_id)
330                 return -1;
331
332         if (!thread_is_running(thread_id)) {
333                 struct thread_data *td = &thread_data[thread_id];
334                 uint32_t i;
335
336                 for (i = 0; i < td->n_pipelines; i++) {
337                         struct pipeline_data *tdp = &td->pipeline_data[i];
338
339                         if (tdp->p != p->p)
340                                 continue;
341
342                         /* Data plane thread */
343                         if (i < td->n_pipelines - 1) {
344                                 struct rte_swx_pipeline *pipeline_last =
345                                         td->p[td->n_pipelines - 1];
346                                 struct pipeline_data *tdp_last =
347                                         &td->pipeline_data[td->n_pipelines - 1];
348
349                                 td->p[i] = pipeline_last;
350                                 memcpy(tdp, tdp_last, sizeof(*tdp));
351                         }
352
353                         td->n_pipelines--;
354
355                         /* Pipeline */
356                         p->enabled = 0;
357
358                         break;
359                 }
360
361                 return 0;
362         }
363
364         /* Allocate request */
365         req = thread_msg_alloc();
366         if (req == NULL)
367                 return -1;
368
369         /* Write request */
370         req->type = THREAD_REQ_PIPELINE_DISABLE;
371         req->pipeline_disable.p = p->p;
372
373         /* Send request and wait for response */
374         rsp = thread_msg_send_recv(thread_id, req);
375
376         /* Read response */
377         status = rsp->status;
378
379         /* Free response */
380         thread_msg_free(rsp);
381
382         /* Request completion */
383         if (status)
384                 return status;
385
386         p->enabled = 0;
387
388         return 0;
389 }
390
391 /**
392  * Data plane threads: message handling
393  */
394 static inline struct thread_msg_req *
395 thread_msg_recv(struct rte_ring *msgq_req)
396 {
397         struct thread_msg_req *req;
398
399         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
400
401         if (status != 0)
402                 return NULL;
403
404         return req;
405 }
406
407 static inline void
408 thread_msg_send(struct rte_ring *msgq_rsp,
409         struct thread_msg_rsp *rsp)
410 {
411         int status;
412
413         do {
414                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
415         } while (status == -ENOBUFS);
416 }
417
418 static struct thread_msg_rsp *
419 thread_msg_handle_pipeline_enable(struct thread_data *t,
420         struct thread_msg_req *req)
421 {
422         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
423         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
424
425         /* Request */
426         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
427                 rsp->status = -1;
428                 return rsp;
429         }
430
431         t->p[t->n_pipelines] = req->pipeline_enable.p;
432
433         p->p = req->pipeline_enable.p;
434         p->timer_period = (rte_get_tsc_hz() *
435                 req->pipeline_enable.timer_period_ms) / 1000;
436         p->time_next = rte_get_tsc_cycles() + p->timer_period;
437
438         t->n_pipelines++;
439
440         /* Response */
441         rsp->status = 0;
442         return rsp;
443 }
444
445 static struct thread_msg_rsp *
446 thread_msg_handle_pipeline_disable(struct thread_data *t,
447         struct thread_msg_req *req)
448 {
449         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
450         uint32_t n_pipelines = t->n_pipelines;
451         struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
452         uint32_t i;
453
454         /* find pipeline */
455         for (i = 0; i < n_pipelines; i++) {
456                 struct pipeline_data *p = &t->pipeline_data[i];
457
458                 if (p->p != pipeline)
459                         continue;
460
461                 if (i < n_pipelines - 1) {
462                         struct rte_swx_pipeline *pipeline_last =
463                                 t->p[n_pipelines - 1];
464                         struct pipeline_data *p_last =
465                                 &t->pipeline_data[n_pipelines - 1];
466
467                         t->p[i] = pipeline_last;
468                         memcpy(p, p_last, sizeof(*p));
469                 }
470
471                 t->n_pipelines--;
472
473                 rsp->status = 0;
474                 return rsp;
475         }
476
477         /* should not get here */
478         rsp->status = 0;
479         return rsp;
480 }
481
482 static void
483 thread_msg_handle(struct thread_data *t)
484 {
485         for ( ; ; ) {
486                 struct thread_msg_req *req;
487                 struct thread_msg_rsp *rsp;
488
489                 req = thread_msg_recv(t->msgq_req);
490                 if (req == NULL)
491                         break;
492
493                 switch (req->type) {
494                 case THREAD_REQ_PIPELINE_ENABLE:
495                         rsp = thread_msg_handle_pipeline_enable(t, req);
496                         break;
497
498                 case THREAD_REQ_PIPELINE_DISABLE:
499                         rsp = thread_msg_handle_pipeline_disable(t, req);
500                         break;
501
502                 default:
503                         rsp = (struct thread_msg_rsp *) req;
504                         rsp->status = -1;
505                 }
506
507                 thread_msg_send(t->msgq_rsp, rsp);
508         }
509 }
510
511 /**
512  * Data plane threads: main
513  */
514 int
515 thread_main(void *arg __rte_unused)
516 {
517         struct thread_data *t;
518         uint32_t thread_id, i;
519
520         thread_id = rte_lcore_id();
521         t = &thread_data[thread_id];
522
523         /* Dispatch loop */
524         for (i = 0; ; i++) {
525                 uint32_t j;
526
527                 /* Data Plane */
528                 for (j = 0; j < t->n_pipelines; j++)
529                         rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
530
531                 /* Control Plane */
532                 if ((i & 0xF) == 0) {
533                         uint64_t time = rte_get_tsc_cycles();
534                         uint64_t time_next_min = UINT64_MAX;
535
536                         if (time < t->time_next_min)
537                                 continue;
538
539                         /* Thread message queues */
540                         {
541                                 uint64_t time_next = t->time_next;
542
543                                 if (time_next <= time) {
544                                         thread_msg_handle(t);
545                                         time_next = time + t->timer_period;
546                                         t->time_next = time_next;
547                                 }
548
549                                 if (time_next < time_next_min)
550                                         time_next_min = time_next;
551                         }
552
553                         t->time_next_min = time_next_min;
554                 }
555         }
556
557         return 0;
558 }