1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2020 Intel Corporation
7 #include <rte_common.h>
8 #include <rte_cycles.h>
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX 256
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE 64
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS 100
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34 * work, but not too big to avoid starving any other pipelines mapped to the
35 * same thread. For a pipeline that executes 10 instructions per packet, a
36 * quanta of 1000 instructions equates to processing 100 packets.
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA 1000
43 * Control thread: data plane thread context
46 struct rte_ring *msgq_req;
47 struct rte_ring *msgq_rsp;
52 static struct thread thread[RTE_MAX_LCORE];
55 * Data plane threads: context
57 struct pipeline_data {
58 struct rte_swx_pipeline *p;
59 uint64_t timer_period; /* Measured in CPU cycles. */
64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 struct rte_ring *msgq_req;
69 struct rte_ring *msgq_rsp;
70 uint64_t timer_period; /* Measured in CPU cycles. */
72 uint64_t time_next_min;
73 } __rte_cache_aligned;
75 static struct thread_data thread_data[RTE_MAX_LCORE];
78 * Control thread: data plane thread init
85 for (i = 0; i < RTE_MAX_LCORE; i++) {
86 struct thread *t = &thread[i];
88 if (!rte_lcore_is_enabled(i))
93 rte_ring_free(t->msgq_req);
96 rte_ring_free(t->msgq_rsp);
105 RTE_LCORE_FOREACH_WORKER(i) {
107 struct rte_ring *msgq_req, *msgq_rsp;
108 struct thread *t = &thread[i];
109 struct thread_data *t_data = &thread_data[i];
110 uint32_t cpu_id = rte_lcore_to_socket_id(i);
113 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
115 msgq_req = rte_ring_create(name,
118 RING_F_SP_ENQ | RING_F_SC_DEQ);
120 if (msgq_req == NULL) {
125 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
127 msgq_rsp = rte_ring_create(name,
130 RING_F_SP_ENQ | RING_F_SC_DEQ);
132 if (msgq_rsp == NULL) {
137 /* Control thread records */
138 t->msgq_req = msgq_req;
139 t->msgq_rsp = msgq_rsp;
142 /* Data plane thread records */
143 t_data->n_pipelines = 0;
144 t_data->msgq_req = msgq_req;
145 t_data->msgq_rsp = msgq_rsp;
146 t_data->timer_period =
147 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
148 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
149 t_data->time_next_min = t_data->time_next;
156 thread_is_running(uint32_t thread_id)
158 enum rte_lcore_state_t thread_state;
160 thread_state = rte_eal_get_lcore_state(thread_id);
161 return (thread_state == RUNNING) ? 1 : 0;
165 * Control thread & data plane threads: message passing
167 enum thread_req_type {
168 THREAD_REQ_PIPELINE_ENABLE = 0,
169 THREAD_REQ_PIPELINE_DISABLE,
173 struct thread_msg_req {
174 enum thread_req_type type;
178 struct rte_swx_pipeline *p;
179 uint32_t timer_period_ms;
183 struct rte_swx_pipeline *p;
188 struct thread_msg_rsp {
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
198 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199 sizeof(struct thread_msg_rsp));
201 return calloc(1, size);
205 thread_msg_free(struct thread_msg_rsp *rsp)
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212 struct thread_msg_req *req)
214 struct thread *t = &thread[thread_id];
215 struct rte_ring *msgq_req = t->msgq_req;
216 struct rte_ring *msgq_rsp = t->msgq_rsp;
217 struct thread_msg_rsp *rsp;
222 status = rte_ring_sp_enqueue(msgq_req, req);
223 } while (status == -ENOBUFS);
227 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228 } while (status != 0);
234 thread_pipeline_enable(uint32_t thread_id,
236 const char *pipeline_name)
238 struct pipeline *p = pipeline_find(obj, pipeline_name);
240 struct thread_msg_req *req;
241 struct thread_msg_rsp *rsp;
244 /* Check input params */
245 if ((thread_id >= RTE_MAX_LCORE) ||
249 t = &thread[thread_id];
253 if (!thread_is_running(thread_id)) {
254 struct thread_data *td = &thread_data[thread_id];
255 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
257 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
260 /* Data plane thread */
261 td->p[td->n_pipelines] = p->p;
265 (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
266 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
271 p->thread_id = thread_id;
277 /* Allocate request */
278 req = thread_msg_alloc();
283 req->type = THREAD_REQ_PIPELINE_ENABLE;
284 req->pipeline_enable.p = p->p;
285 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
287 /* Send request and wait for response */
288 rsp = thread_msg_send_recv(thread_id, req);
291 status = rsp->status;
294 thread_msg_free(rsp);
296 /* Request completion */
300 p->thread_id = thread_id;
307 thread_pipeline_disable(uint32_t thread_id,
309 const char *pipeline_name)
311 struct pipeline *p = pipeline_find(obj, pipeline_name);
313 struct thread_msg_req *req;
314 struct thread_msg_rsp *rsp;
317 /* Check input params */
318 if ((thread_id >= RTE_MAX_LCORE) ||
322 t = &thread[thread_id];
329 if (p->thread_id != thread_id)
332 if (!thread_is_running(thread_id)) {
333 struct thread_data *td = &thread_data[thread_id];
336 for (i = 0; i < td->n_pipelines; i++) {
337 struct pipeline_data *tdp = &td->pipeline_data[i];
342 /* Data plane thread */
343 if (i < td->n_pipelines - 1) {
344 struct rte_swx_pipeline *pipeline_last =
345 td->p[td->n_pipelines - 1];
346 struct pipeline_data *tdp_last =
347 &td->pipeline_data[td->n_pipelines - 1];
349 td->p[i] = pipeline_last;
350 memcpy(tdp, tdp_last, sizeof(*tdp));
364 /* Allocate request */
365 req = thread_msg_alloc();
370 req->type = THREAD_REQ_PIPELINE_DISABLE;
371 req->pipeline_disable.p = p->p;
373 /* Send request and wait for response */
374 rsp = thread_msg_send_recv(thread_id, req);
377 status = rsp->status;
380 thread_msg_free(rsp);
382 /* Request completion */
392 * Data plane threads: message handling
394 static inline struct thread_msg_req *
395 thread_msg_recv(struct rte_ring *msgq_req)
397 struct thread_msg_req *req;
399 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
408 thread_msg_send(struct rte_ring *msgq_rsp,
409 struct thread_msg_rsp *rsp)
414 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
415 } while (status == -ENOBUFS);
418 static struct thread_msg_rsp *
419 thread_msg_handle_pipeline_enable(struct thread_data *t,
420 struct thread_msg_req *req)
422 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
423 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
426 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
431 t->p[t->n_pipelines] = req->pipeline_enable.p;
433 p->p = req->pipeline_enable.p;
434 p->timer_period = (rte_get_tsc_hz() *
435 req->pipeline_enable.timer_period_ms) / 1000;
436 p->time_next = rte_get_tsc_cycles() + p->timer_period;
445 static struct thread_msg_rsp *
446 thread_msg_handle_pipeline_disable(struct thread_data *t,
447 struct thread_msg_req *req)
449 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
450 uint32_t n_pipelines = t->n_pipelines;
451 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
455 for (i = 0; i < n_pipelines; i++) {
456 struct pipeline_data *p = &t->pipeline_data[i];
458 if (p->p != pipeline)
461 if (i < n_pipelines - 1) {
462 struct rte_swx_pipeline *pipeline_last =
463 t->p[n_pipelines - 1];
464 struct pipeline_data *p_last =
465 &t->pipeline_data[n_pipelines - 1];
467 t->p[i] = pipeline_last;
468 memcpy(p, p_last, sizeof(*p));
477 /* should not get here */
483 thread_msg_handle(struct thread_data *t)
486 struct thread_msg_req *req;
487 struct thread_msg_rsp *rsp;
489 req = thread_msg_recv(t->msgq_req);
494 case THREAD_REQ_PIPELINE_ENABLE:
495 rsp = thread_msg_handle_pipeline_enable(t, req);
498 case THREAD_REQ_PIPELINE_DISABLE:
499 rsp = thread_msg_handle_pipeline_disable(t, req);
503 rsp = (struct thread_msg_rsp *) req;
507 thread_msg_send(t->msgq_rsp, rsp);
512 * Data plane threads: main
515 thread_main(void *arg __rte_unused)
517 struct thread_data *t;
518 uint32_t thread_id, i;
520 thread_id = rte_lcore_id();
521 t = &thread_data[thread_id];
528 for (j = 0; j < t->n_pipelines; j++)
529 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
532 if ((i & 0xF) == 0) {
533 uint64_t time = rte_get_tsc_cycles();
534 uint64_t time_next_min = UINT64_MAX;
536 if (time < t->time_next_min)
539 /* Thread message queues */
541 uint64_t time_next = t->time_next;
543 if (time_next <= time) {
544 thread_msg_handle(t);
545 time_next = time + t->timer_period;
546 t->time_next = time_next;
549 if (time_next < time_next_min)
550 time_next_min = time_next;
553 t->time_next_min = time_next_min;