examples/l3fwd: add vector stubs for RISC-V
[dpdk.git] / examples / pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "obj.h"
19 #include "thread.h"
20
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34  * work, but not too big to avoid starving any other pipelines mapped to the
35  * same thread. For a pipeline that executes 10 instructions per packet, a
36  * quanta of 1000 instructions equates to processing 100 packets.
37  */
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA                              1000
40 #endif
41
42 /**
43  * Control thread: data plane thread context
44  */
45 struct thread {
46         struct rte_ring *msgq_req;
47         struct rte_ring *msgq_rsp;
48
49         uint32_t enabled;
50 };
51
52 static struct thread thread[RTE_MAX_LCORE];
53
54 /**
55  * Data plane threads: context
56  */
57 struct pipeline_data {
58         struct rte_swx_pipeline *p;
59         uint64_t timer_period; /* Measured in CPU cycles. */
60         uint64_t time_next;
61 };
62
63 struct thread_data {
64         struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65         uint32_t n_pipelines;
66
67         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68         struct rte_ring *msgq_req;
69         struct rte_ring *msgq_rsp;
70         uint64_t timer_period; /* Measured in CPU cycles. */
71         uint64_t time_next;
72         uint64_t time_next_min;
73 } __rte_cache_aligned;
74
75 static struct thread_data thread_data[RTE_MAX_LCORE];
76
77 /**
78  * Control thread: data plane thread init
79  */
80 static void
81 thread_free(void)
82 {
83         uint32_t i;
84
85         for (i = 0; i < RTE_MAX_LCORE; i++) {
86                 struct thread *t = &thread[i];
87
88                 if (!rte_lcore_is_enabled(i))
89                         continue;
90
91                 /* MSGQs */
92                 rte_ring_free(t->msgq_req);
93
94                 rte_ring_free(t->msgq_rsp);
95         }
96 }
97
98 int
99 thread_init(void)
100 {
101         uint32_t i;
102
103         RTE_LCORE_FOREACH_WORKER(i) {
104                 char name[NAME_MAX];
105                 struct rte_ring *msgq_req, *msgq_rsp;
106                 struct thread *t = &thread[i];
107                 struct thread_data *t_data = &thread_data[i];
108                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
109
110                 /* MSGQs */
111                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
112
113                 msgq_req = rte_ring_create(name,
114                         THREAD_MSGQ_SIZE,
115                         cpu_id,
116                         RING_F_SP_ENQ | RING_F_SC_DEQ);
117
118                 if (msgq_req == NULL) {
119                         thread_free();
120                         return -1;
121                 }
122
123                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
124
125                 msgq_rsp = rte_ring_create(name,
126                         THREAD_MSGQ_SIZE,
127                         cpu_id,
128                         RING_F_SP_ENQ | RING_F_SC_DEQ);
129
130                 if (msgq_rsp == NULL) {
131                         thread_free();
132                         return -1;
133                 }
134
135                 /* Control thread records */
136                 t->msgq_req = msgq_req;
137                 t->msgq_rsp = msgq_rsp;
138                 t->enabled = 1;
139
140                 /* Data plane thread records */
141                 t_data->n_pipelines = 0;
142                 t_data->msgq_req = msgq_req;
143                 t_data->msgq_rsp = msgq_rsp;
144                 t_data->timer_period =
145                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
146                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
147                 t_data->time_next_min = t_data->time_next;
148         }
149
150         return 0;
151 }
152
153 static inline int
154 thread_is_running(uint32_t thread_id)
155 {
156         enum rte_lcore_state_t thread_state;
157
158         thread_state = rte_eal_get_lcore_state(thread_id);
159         return (thread_state == RUNNING) ? 1 : 0;
160 }
161
162 /**
163  * Control thread & data plane threads: message passing
164  */
165 enum thread_req_type {
166         THREAD_REQ_PIPELINE_ENABLE = 0,
167         THREAD_REQ_PIPELINE_DISABLE,
168         THREAD_REQ_MAX
169 };
170
171 struct thread_msg_req {
172         enum thread_req_type type;
173
174         union {
175                 struct {
176                         struct rte_swx_pipeline *p;
177                         uint32_t timer_period_ms;
178                 } pipeline_enable;
179
180                 struct {
181                         struct rte_swx_pipeline *p;
182                 } pipeline_disable;
183         };
184 };
185
186 struct thread_msg_rsp {
187         int status;
188 };
189
190 /**
191  * Control thread
192  */
193 static struct thread_msg_req *
194 thread_msg_alloc(void)
195 {
196         size_t size = RTE_MAX(sizeof(struct thread_msg_req),
197                 sizeof(struct thread_msg_rsp));
198
199         return calloc(1, size);
200 }
201
202 static void
203 thread_msg_free(struct thread_msg_rsp *rsp)
204 {
205         free(rsp);
206 }
207
208 static struct thread_msg_rsp *
209 thread_msg_send_recv(uint32_t thread_id,
210         struct thread_msg_req *req)
211 {
212         struct thread *t = &thread[thread_id];
213         struct rte_ring *msgq_req = t->msgq_req;
214         struct rte_ring *msgq_rsp = t->msgq_rsp;
215         struct thread_msg_rsp *rsp;
216         int status;
217
218         /* send */
219         do {
220                 status = rte_ring_sp_enqueue(msgq_req, req);
221         } while (status == -ENOBUFS);
222
223         /* recv */
224         do {
225                 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
226         } while (status != 0);
227
228         return rsp;
229 }
230
231 int
232 thread_pipeline_enable(uint32_t thread_id,
233         struct obj *obj,
234         const char *pipeline_name)
235 {
236         struct pipeline *p = pipeline_find(obj, pipeline_name);
237         struct thread *t;
238         struct thread_msg_req *req;
239         struct thread_msg_rsp *rsp;
240         int status;
241
242         /* Check input params */
243         if ((thread_id >= RTE_MAX_LCORE) ||
244                 (p == NULL))
245                 return -1;
246
247         t = &thread[thread_id];
248         if (t->enabled == 0)
249                 return -1;
250
251         if (!thread_is_running(thread_id)) {
252                 struct thread_data *td = &thread_data[thread_id];
253                 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
254
255                 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
256                         return -1;
257
258                 /* Data plane thread */
259                 td->p[td->n_pipelines] = p->p;
260
261                 tdp->p = p->p;
262                 tdp->timer_period =
263                         (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
264                 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
265
266                 td->n_pipelines++;
267
268                 /* Pipeline */
269                 p->thread_id = thread_id;
270                 p->enabled = 1;
271
272                 return 0;
273         }
274
275         /* Allocate request */
276         req = thread_msg_alloc();
277         if (req == NULL)
278                 return -1;
279
280         /* Write request */
281         req->type = THREAD_REQ_PIPELINE_ENABLE;
282         req->pipeline_enable.p = p->p;
283         req->pipeline_enable.timer_period_ms = p->timer_period_ms;
284
285         /* Send request and wait for response */
286         rsp = thread_msg_send_recv(thread_id, req);
287
288         /* Read response */
289         status = rsp->status;
290
291         /* Free response */
292         thread_msg_free(rsp);
293
294         /* Request completion */
295         if (status)
296                 return status;
297
298         p->thread_id = thread_id;
299         p->enabled = 1;
300
301         return 0;
302 }
303
304 int
305 thread_pipeline_disable(uint32_t thread_id,
306         struct obj *obj,
307         const char *pipeline_name)
308 {
309         struct pipeline *p = pipeline_find(obj, pipeline_name);
310         struct thread *t;
311         struct thread_msg_req *req;
312         struct thread_msg_rsp *rsp;
313         int status;
314
315         /* Check input params */
316         if ((thread_id >= RTE_MAX_LCORE) ||
317                 (p == NULL))
318                 return -1;
319
320         t = &thread[thread_id];
321         if (t->enabled == 0)
322                 return -1;
323
324         if (p->enabled == 0)
325                 return 0;
326
327         if (p->thread_id != thread_id)
328                 return -1;
329
330         if (!thread_is_running(thread_id)) {
331                 struct thread_data *td = &thread_data[thread_id];
332                 uint32_t i;
333
334                 for (i = 0; i < td->n_pipelines; i++) {
335                         struct pipeline_data *tdp = &td->pipeline_data[i];
336
337                         if (tdp->p != p->p)
338                                 continue;
339
340                         /* Data plane thread */
341                         if (i < td->n_pipelines - 1) {
342                                 struct rte_swx_pipeline *pipeline_last =
343                                         td->p[td->n_pipelines - 1];
344                                 struct pipeline_data *tdp_last =
345                                         &td->pipeline_data[td->n_pipelines - 1];
346
347                                 td->p[i] = pipeline_last;
348                                 memcpy(tdp, tdp_last, sizeof(*tdp));
349                         }
350
351                         td->n_pipelines--;
352
353                         /* Pipeline */
354                         p->enabled = 0;
355
356                         break;
357                 }
358
359                 return 0;
360         }
361
362         /* Allocate request */
363         req = thread_msg_alloc();
364         if (req == NULL)
365                 return -1;
366
367         /* Write request */
368         req->type = THREAD_REQ_PIPELINE_DISABLE;
369         req->pipeline_disable.p = p->p;
370
371         /* Send request and wait for response */
372         rsp = thread_msg_send_recv(thread_id, req);
373
374         /* Read response */
375         status = rsp->status;
376
377         /* Free response */
378         thread_msg_free(rsp);
379
380         /* Request completion */
381         if (status)
382                 return status;
383
384         p->enabled = 0;
385
386         return 0;
387 }
388
389 /**
390  * Data plane threads: message handling
391  */
392 static inline struct thread_msg_req *
393 thread_msg_recv(struct rte_ring *msgq_req)
394 {
395         struct thread_msg_req *req;
396
397         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
398
399         if (status != 0)
400                 return NULL;
401
402         return req;
403 }
404
405 static inline void
406 thread_msg_send(struct rte_ring *msgq_rsp,
407         struct thread_msg_rsp *rsp)
408 {
409         int status;
410
411         do {
412                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
413         } while (status == -ENOBUFS);
414 }
415
416 static struct thread_msg_rsp *
417 thread_msg_handle_pipeline_enable(struct thread_data *t,
418         struct thread_msg_req *req)
419 {
420         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
421         struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
422
423         /* Request */
424         if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
425                 rsp->status = -1;
426                 return rsp;
427         }
428
429         t->p[t->n_pipelines] = req->pipeline_enable.p;
430
431         p->p = req->pipeline_enable.p;
432         p->timer_period = (rte_get_tsc_hz() *
433                 req->pipeline_enable.timer_period_ms) / 1000;
434         p->time_next = rte_get_tsc_cycles() + p->timer_period;
435
436         t->n_pipelines++;
437
438         /* Response */
439         rsp->status = 0;
440         return rsp;
441 }
442
443 static struct thread_msg_rsp *
444 thread_msg_handle_pipeline_disable(struct thread_data *t,
445         struct thread_msg_req *req)
446 {
447         struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
448         uint32_t n_pipelines = t->n_pipelines;
449         struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
450         uint32_t i;
451
452         /* find pipeline */
453         for (i = 0; i < n_pipelines; i++) {
454                 struct pipeline_data *p = &t->pipeline_data[i];
455
456                 if (p->p != pipeline)
457                         continue;
458
459                 if (i < n_pipelines - 1) {
460                         struct rte_swx_pipeline *pipeline_last =
461                                 t->p[n_pipelines - 1];
462                         struct pipeline_data *p_last =
463                                 &t->pipeline_data[n_pipelines - 1];
464
465                         t->p[i] = pipeline_last;
466                         memcpy(p, p_last, sizeof(*p));
467                 }
468
469                 t->n_pipelines--;
470
471                 rsp->status = 0;
472                 return rsp;
473         }
474
475         /* should not get here */
476         rsp->status = 0;
477         return rsp;
478 }
479
480 static void
481 thread_msg_handle(struct thread_data *t)
482 {
483         for ( ; ; ) {
484                 struct thread_msg_req *req;
485                 struct thread_msg_rsp *rsp;
486
487                 req = thread_msg_recv(t->msgq_req);
488                 if (req == NULL)
489                         break;
490
491                 switch (req->type) {
492                 case THREAD_REQ_PIPELINE_ENABLE:
493                         rsp = thread_msg_handle_pipeline_enable(t, req);
494                         break;
495
496                 case THREAD_REQ_PIPELINE_DISABLE:
497                         rsp = thread_msg_handle_pipeline_disable(t, req);
498                         break;
499
500                 default:
501                         rsp = (struct thread_msg_rsp *) req;
502                         rsp->status = -1;
503                 }
504
505                 thread_msg_send(t->msgq_rsp, rsp);
506         }
507 }
508
509 /**
510  * Data plane threads: main
511  */
512 int
513 thread_main(void *arg __rte_unused)
514 {
515         struct thread_data *t;
516         uint32_t thread_id, i;
517
518         thread_id = rte_lcore_id();
519         t = &thread_data[thread_id];
520
521         /* Dispatch loop */
522         for (i = 0; ; i++) {
523                 uint32_t j;
524
525                 /* Data Plane */
526                 for (j = 0; j < t->n_pipelines; j++)
527                         rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
528
529                 /* Control Plane */
530                 if ((i & 0xF) == 0) {
531                         uint64_t time = rte_get_tsc_cycles();
532                         uint64_t time_next_min = UINT64_MAX;
533
534                         if (time < t->time_next_min)
535                                 continue;
536
537                         /* Thread message queues */
538                         {
539                                 uint64_t time_next = t->time_next;
540
541                                 if (time_next <= time) {
542                                         thread_msg_handle(t);
543                                         time_next = time + t->timer_period;
544                                         t->time_next = time_next;
545                                 }
546
547                                 if (time_next < time_next_min)
548                                         time_next_min = time_next;
549                         }
550
551                         t->time_next_min = time_next_min;
552                 }
553         }
554
555         return 0;
556 }