1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2020 Intel Corporation
7 #include <rte_common.h>
8 #include <rte_cycles.h>
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX 256
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE 64
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS 100
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34 * work, but not too big to avoid starving any other pipelines mapped to the
35 * same thread. For a pipeline that executes 10 instructions per packet, a
36 * quanta of 1000 instructions equates to processing 100 packets.
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA 1000
43 * Control thread: data plane thread context
46 struct rte_ring *msgq_req;
47 struct rte_ring *msgq_rsp;
52 static struct thread thread[RTE_MAX_LCORE];
55 * Data plane threads: context
57 struct pipeline_data {
58 struct rte_swx_pipeline *p;
59 uint64_t timer_period; /* Measured in CPU cycles. */
64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 struct rte_ring *msgq_req;
69 struct rte_ring *msgq_rsp;
70 uint64_t timer_period; /* Measured in CPU cycles. */
72 uint64_t time_next_min;
73 } __rte_cache_aligned;
75 static struct thread_data thread_data[RTE_MAX_LCORE];
78 * Control thread: data plane thread init
85 for (i = 0; i < RTE_MAX_LCORE; i++) {
86 struct thread *t = &thread[i];
88 if (!rte_lcore_is_enabled(i))
92 rte_ring_free(t->msgq_req);
94 rte_ring_free(t->msgq_rsp);
103 RTE_LCORE_FOREACH_WORKER(i) {
105 struct rte_ring *msgq_req, *msgq_rsp;
106 struct thread *t = &thread[i];
107 struct thread_data *t_data = &thread_data[i];
108 uint32_t cpu_id = rte_lcore_to_socket_id(i);
111 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
113 msgq_req = rte_ring_create(name,
116 RING_F_SP_ENQ | RING_F_SC_DEQ);
118 if (msgq_req == NULL) {
123 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
125 msgq_rsp = rte_ring_create(name,
128 RING_F_SP_ENQ | RING_F_SC_DEQ);
130 if (msgq_rsp == NULL) {
135 /* Control thread records */
136 t->msgq_req = msgq_req;
137 t->msgq_rsp = msgq_rsp;
140 /* Data plane thread records */
141 t_data->n_pipelines = 0;
142 t_data->msgq_req = msgq_req;
143 t_data->msgq_rsp = msgq_rsp;
144 t_data->timer_period =
145 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
146 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
147 t_data->time_next_min = t_data->time_next;
154 thread_is_running(uint32_t thread_id)
156 enum rte_lcore_state_t thread_state;
158 thread_state = rte_eal_get_lcore_state(thread_id);
159 return (thread_state == RUNNING) ? 1 : 0;
163 * Control thread & data plane threads: message passing
165 enum thread_req_type {
166 THREAD_REQ_PIPELINE_ENABLE = 0,
167 THREAD_REQ_PIPELINE_DISABLE,
171 struct thread_msg_req {
172 enum thread_req_type type;
176 struct rte_swx_pipeline *p;
177 uint32_t timer_period_ms;
181 struct rte_swx_pipeline *p;
186 struct thread_msg_rsp {
193 static struct thread_msg_req *
194 thread_msg_alloc(void)
196 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
197 sizeof(struct thread_msg_rsp));
199 return calloc(1, size);
203 thread_msg_free(struct thread_msg_rsp *rsp)
208 static struct thread_msg_rsp *
209 thread_msg_send_recv(uint32_t thread_id,
210 struct thread_msg_req *req)
212 struct thread *t = &thread[thread_id];
213 struct rte_ring *msgq_req = t->msgq_req;
214 struct rte_ring *msgq_rsp = t->msgq_rsp;
215 struct thread_msg_rsp *rsp;
220 status = rte_ring_sp_enqueue(msgq_req, req);
221 } while (status == -ENOBUFS);
225 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
226 } while (status != 0);
232 thread_pipeline_enable(uint32_t thread_id,
234 const char *pipeline_name)
236 struct pipeline *p = pipeline_find(obj, pipeline_name);
238 struct thread_msg_req *req;
239 struct thread_msg_rsp *rsp;
242 /* Check input params */
243 if ((thread_id >= RTE_MAX_LCORE) ||
247 t = &thread[thread_id];
251 if (!thread_is_running(thread_id)) {
252 struct thread_data *td = &thread_data[thread_id];
253 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
255 if (td->n_pipelines >= THREAD_PIPELINES_MAX)
258 /* Data plane thread */
259 td->p[td->n_pipelines] = p->p;
263 (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
264 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
269 p->thread_id = thread_id;
275 /* Allocate request */
276 req = thread_msg_alloc();
281 req->type = THREAD_REQ_PIPELINE_ENABLE;
282 req->pipeline_enable.p = p->p;
283 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
285 /* Send request and wait for response */
286 rsp = thread_msg_send_recv(thread_id, req);
289 status = rsp->status;
292 thread_msg_free(rsp);
294 /* Request completion */
298 p->thread_id = thread_id;
305 thread_pipeline_disable(uint32_t thread_id,
307 const char *pipeline_name)
309 struct pipeline *p = pipeline_find(obj, pipeline_name);
311 struct thread_msg_req *req;
312 struct thread_msg_rsp *rsp;
315 /* Check input params */
316 if ((thread_id >= RTE_MAX_LCORE) ||
320 t = &thread[thread_id];
327 if (p->thread_id != thread_id)
330 if (!thread_is_running(thread_id)) {
331 struct thread_data *td = &thread_data[thread_id];
334 for (i = 0; i < td->n_pipelines; i++) {
335 struct pipeline_data *tdp = &td->pipeline_data[i];
340 /* Data plane thread */
341 if (i < td->n_pipelines - 1) {
342 struct rte_swx_pipeline *pipeline_last =
343 td->p[td->n_pipelines - 1];
344 struct pipeline_data *tdp_last =
345 &td->pipeline_data[td->n_pipelines - 1];
347 td->p[i] = pipeline_last;
348 memcpy(tdp, tdp_last, sizeof(*tdp));
362 /* Allocate request */
363 req = thread_msg_alloc();
368 req->type = THREAD_REQ_PIPELINE_DISABLE;
369 req->pipeline_disable.p = p->p;
371 /* Send request and wait for response */
372 rsp = thread_msg_send_recv(thread_id, req);
375 status = rsp->status;
378 thread_msg_free(rsp);
380 /* Request completion */
390 * Data plane threads: message handling
392 static inline struct thread_msg_req *
393 thread_msg_recv(struct rte_ring *msgq_req)
395 struct thread_msg_req *req;
397 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
406 thread_msg_send(struct rte_ring *msgq_rsp,
407 struct thread_msg_rsp *rsp)
412 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
413 } while (status == -ENOBUFS);
416 static struct thread_msg_rsp *
417 thread_msg_handle_pipeline_enable(struct thread_data *t,
418 struct thread_msg_req *req)
420 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
421 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
424 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
429 t->p[t->n_pipelines] = req->pipeline_enable.p;
431 p->p = req->pipeline_enable.p;
432 p->timer_period = (rte_get_tsc_hz() *
433 req->pipeline_enable.timer_period_ms) / 1000;
434 p->time_next = rte_get_tsc_cycles() + p->timer_period;
443 static struct thread_msg_rsp *
444 thread_msg_handle_pipeline_disable(struct thread_data *t,
445 struct thread_msg_req *req)
447 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
448 uint32_t n_pipelines = t->n_pipelines;
449 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
453 for (i = 0; i < n_pipelines; i++) {
454 struct pipeline_data *p = &t->pipeline_data[i];
456 if (p->p != pipeline)
459 if (i < n_pipelines - 1) {
460 struct rte_swx_pipeline *pipeline_last =
461 t->p[n_pipelines - 1];
462 struct pipeline_data *p_last =
463 &t->pipeline_data[n_pipelines - 1];
465 t->p[i] = pipeline_last;
466 memcpy(p, p_last, sizeof(*p));
475 /* should not get here */
481 thread_msg_handle(struct thread_data *t)
484 struct thread_msg_req *req;
485 struct thread_msg_rsp *rsp;
487 req = thread_msg_recv(t->msgq_req);
492 case THREAD_REQ_PIPELINE_ENABLE:
493 rsp = thread_msg_handle_pipeline_enable(t, req);
496 case THREAD_REQ_PIPELINE_DISABLE:
497 rsp = thread_msg_handle_pipeline_disable(t, req);
501 rsp = (struct thread_msg_rsp *) req;
505 thread_msg_send(t->msgq_rsp, rsp);
510 * Data plane threads: main
513 thread_main(void *arg __rte_unused)
515 struct thread_data *t;
516 uint32_t thread_id, i;
518 thread_id = rte_lcore_id();
519 t = &thread_data[thread_id];
526 for (j = 0; j < t->n_pipelines; j++)
527 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
530 if ((i & 0xF) == 0) {
531 uint64_t time = rte_get_tsc_cycles();
532 uint64_t time_next_min = UINT64_MAX;
534 if (time < t->time_next_min)
537 /* Thread message queues */
539 uint64_t time_next = t->time_next;
541 if (time_next <= time) {
542 thread_msg_handle(t);
543 time_next = time + t->timer_period;
544 t->time_next = time_next;
547 if (time_next < time_next_min)
548 time_next_min = time_next;
551 t->time_next_min = time_next_min;