examples/ip_pipeline: add thread runtime
[dpdk.git] / examples / ip_pipeline / thread.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4
5 #include <stdlib.h>
6
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38         struct rte_ring *msgq_req;
39         struct rte_ring *msgq_rsp;
40
41         uint32_t enabled;
42 };
43
44 static struct thread thread[RTE_MAX_LCORE];
45
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50         struct rte_table_action *a;
51 };
52
53 struct pipeline_data {
54         struct rte_pipeline *p;
55         struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56         uint32_t n_tables;
57
58         struct rte_ring *msgq_req;
59         struct rte_ring *msgq_rsp;
60         uint64_t timer_period; /* Measured in CPU cycles. */
61         uint64_t time_next;
62
63         uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65
66 struct thread_data {
67         struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68         uint32_t n_pipelines;
69
70         struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71         struct rte_ring *msgq_req;
72         struct rte_ring *msgq_rsp;
73         uint64_t timer_period; /* Measured in CPU cycles. */
74         uint64_t time_next;
75         uint64_t time_next_min;
76 } __rte_cache_aligned;
77
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86         uint32_t i;
87
88         for (i = 0; i < RTE_MAX_LCORE; i++) {
89                 struct thread *t = &thread[i];
90
91                 if (!rte_lcore_is_enabled(i))
92                         continue;
93
94                 /* MSGQs */
95                 if (t->msgq_req)
96                         rte_ring_free(t->msgq_req);
97
98                 if (t->msgq_rsp)
99                         rte_ring_free(t->msgq_rsp);
100         }
101 }
102
103 int
104 thread_init(void)
105 {
106         uint32_t i;
107
108         RTE_LCORE_FOREACH_SLAVE(i) {
109                 char name[NAME_MAX];
110                 struct rte_ring *msgq_req, *msgq_rsp;
111                 struct thread *t = &thread[i];
112                 struct thread_data *t_data = &thread_data[i];
113                 uint32_t cpu_id = rte_lcore_to_socket_id(i);
114
115                 /* MSGQs */
116                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117
118                 msgq_req = rte_ring_create(name,
119                         THREAD_MSGQ_SIZE,
120                         cpu_id,
121                         RING_F_SP_ENQ | RING_F_SC_DEQ);
122
123                 if (msgq_req == NULL) {
124                         thread_free();
125                         return -1;
126                 }
127
128                 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129
130                 msgq_rsp = rte_ring_create(name,
131                         THREAD_MSGQ_SIZE,
132                         cpu_id,
133                         RING_F_SP_ENQ | RING_F_SC_DEQ);
134
135                 if (msgq_rsp == NULL) {
136                         thread_free();
137                         return -1;
138                 }
139
140                 /* Master thread records */
141                 t->msgq_req = msgq_req;
142                 t->msgq_rsp = msgq_rsp;
143                 t->enabled = 1;
144
145                 /* Data plane thread records */
146                 t_data->n_pipelines = 0;
147                 t_data->msgq_req = msgq_req;
148                 t_data->msgq_rsp = msgq_rsp;
149                 t_data->timer_period =
150                         (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151                 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152                 t_data->time_next_min = t_data->time_next;
153         }
154
155         return 0;
156 }
157
158 /**
159  * Master thread & data plane threads: message passing
160  */
161 enum thread_req_type {
162         THREAD_REQ_MAX
163 };
164
165 struct thread_msg_req {
166         enum thread_req_type type;
167 };
168
169 struct thread_msg_rsp {
170         int status;
171 };
172
173 /**
174  * Data plane threads: message handling
175  */
176 static inline struct thread_msg_req *
177 thread_msg_recv(struct rte_ring *msgq_req)
178 {
179         struct thread_msg_req *req;
180
181         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
182
183         if (status != 0)
184                 return NULL;
185
186         return req;
187 }
188
189 static inline void
190 thread_msg_send(struct rte_ring *msgq_rsp,
191         struct thread_msg_rsp *rsp)
192 {
193         int status;
194
195         do {
196                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
197         } while (status == -ENOBUFS);
198 }
199
200 static void
201 thread_msg_handle(struct thread_data *t)
202 {
203         for ( ; ; ) {
204                 struct thread_msg_req *req;
205                 struct thread_msg_rsp *rsp;
206
207                 req = thread_msg_recv(t->msgq_req);
208                 if (req == NULL)
209                         break;
210
211                 switch (req->type) {
212                 default:
213                         rsp = (struct thread_msg_rsp *) req;
214                         rsp->status = -1;
215                 }
216
217                 thread_msg_send(t->msgq_rsp, rsp);
218         }
219 }
220
221 /**
222  * Master thread & data plane threads: message passing
223  */
224
225 enum pipeline_req_type {
226         PIPELINE_REQ_MAX
227 };
228
229 struct pipeline_msg_req {
230         enum pipeline_req_type type;
231 };
232
233 struct pipeline_msg_rsp {
234         int status;
235 };
236
237 /**
238  * Data plane threads: message handling
239  */
240 static inline struct pipeline_msg_req *
241 pipeline_msg_recv(struct rte_ring *msgq_req)
242 {
243         struct pipeline_msg_req *req;
244
245         int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
246
247         if (status != 0)
248                 return NULL;
249
250         return req;
251 }
252
253 static inline void
254 pipeline_msg_send(struct rte_ring *msgq_rsp,
255         struct pipeline_msg_rsp *rsp)
256 {
257         int status;
258
259         do {
260                 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
261         } while (status == -ENOBUFS);
262 }
263
264 static void
265 pipeline_msg_handle(struct pipeline_data *p)
266 {
267         for ( ; ; ) {
268                 struct pipeline_msg_req *req;
269                 struct pipeline_msg_rsp *rsp;
270
271                 req = pipeline_msg_recv(p->msgq_req);
272                 if (req == NULL)
273                         break;
274
275                 switch (req->type) {
276                 default:
277                         rsp = (struct pipeline_msg_rsp *) req;
278                         rsp->status = -1;
279                 }
280
281                 pipeline_msg_send(p->msgq_rsp, rsp);
282         }
283 }
284
285 /**
286  * Data plane threads: main
287  */
288 int
289 thread_main(void *arg __rte_unused)
290 {
291         struct thread_data *t;
292         uint32_t thread_id, i;
293
294         thread_id = rte_lcore_id();
295         t = &thread_data[thread_id];
296
297         /* Dispatch loop */
298         for (i = 0; ; i++) {
299                 uint32_t j;
300
301                 /* Data Plane */
302                 for (j = 0; j < t->n_pipelines; j++)
303                         rte_pipeline_run(t->p[j]);
304
305                 /* Control Plane */
306                 if ((i & 0xF) == 0) {
307                         uint64_t time = rte_get_tsc_cycles();
308                         uint64_t time_next_min = UINT64_MAX;
309
310                         if (time < t->time_next_min)
311                                 continue;
312
313                         /* Pipeline message queues */
314                         for (j = 0; j < t->n_pipelines; j++) {
315                                 struct pipeline_data *p =
316                                         &t->pipeline_data[j];
317                                 uint64_t time_next = p->time_next;
318
319                                 if (time_next <= time) {
320                                         pipeline_msg_handle(p);
321                                         rte_pipeline_flush(p->p);
322                                         time_next = time + p->timer_period;
323                                         p->time_next = time_next;
324                                 }
325
326                                 if (time_next < time_next_min)
327                                         time_next_min = time_next;
328                         }
329
330                         /* Thread message queues */
331                         {
332                                 uint64_t time_next = t->time_next;
333
334                                 if (time_next <= time) {
335                                         thread_msg_handle(t);
336                                         time_next = time + t->timer_period;
337                                         t->time_next = time_next;
338                                 }
339
340                                 if (time_next < time_next_min)
341                                         time_next_min = time_next;
342                         }
343
344                         t->time_next_min = time_next_min;
345                 }
346         }
347
348         return 0;
349 }