1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2018 Intel Corporation
7 #include <rte_common.h>
8 #include <rte_cycles.h>
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX 256
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE 64
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS 100
35 * Master thead: data plane thread context
38 struct rte_ring *msgq_req;
39 struct rte_ring *msgq_rsp;
44 static struct thread thread[RTE_MAX_LCORE];
47 * Data plane threads: context
50 struct rte_table_action *a;
53 struct pipeline_data {
54 struct rte_pipeline *p;
55 struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
58 struct rte_ring *msgq_req;
59 struct rte_ring *msgq_rsp;
60 uint64_t timer_period; /* Measured in CPU cycles. */
63 uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
67 struct rte_pipeline *p[THREAD_PIPELINES_MAX];
70 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 struct rte_ring *msgq_req;
72 struct rte_ring *msgq_rsp;
73 uint64_t timer_period; /* Measured in CPU cycles. */
75 uint64_t time_next_min;
76 } __rte_cache_aligned;
78 static struct thread_data thread_data[RTE_MAX_LCORE];
81 * Master thread: data plane thread init
88 for (i = 0; i < RTE_MAX_LCORE; i++) {
89 struct thread *t = &thread[i];
91 if (!rte_lcore_is_enabled(i))
96 rte_ring_free(t->msgq_req);
99 rte_ring_free(t->msgq_rsp);
108 RTE_LCORE_FOREACH_SLAVE(i) {
110 struct rte_ring *msgq_req, *msgq_rsp;
111 struct thread *t = &thread[i];
112 struct thread_data *t_data = &thread_data[i];
113 uint32_t cpu_id = rte_lcore_to_socket_id(i);
116 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
118 msgq_req = rte_ring_create(name,
121 RING_F_SP_ENQ | RING_F_SC_DEQ);
123 if (msgq_req == NULL) {
128 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
130 msgq_rsp = rte_ring_create(name,
133 RING_F_SP_ENQ | RING_F_SC_DEQ);
135 if (msgq_rsp == NULL) {
140 /* Master thread records */
141 t->msgq_req = msgq_req;
142 t->msgq_rsp = msgq_rsp;
145 /* Data plane thread records */
146 t_data->n_pipelines = 0;
147 t_data->msgq_req = msgq_req;
148 t_data->msgq_rsp = msgq_rsp;
149 t_data->timer_period =
150 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 t_data->time_next_min = t_data->time_next;
159 * Master thread & data plane threads: message passing
161 enum thread_req_type {
162 THREAD_REQ_PIPELINE_ENABLE = 0,
163 THREAD_REQ_PIPELINE_DISABLE,
167 struct thread_msg_req {
168 enum thread_req_type type;
172 struct rte_pipeline *p;
174 struct rte_table_action *a;
175 } table[RTE_PIPELINE_TABLE_MAX];
176 struct rte_ring *msgq_req;
177 struct rte_ring *msgq_rsp;
178 uint32_t timer_period_ms;
183 struct rte_pipeline *p;
188 struct thread_msg_rsp {
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
198 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199 sizeof(struct thread_msg_rsp));
201 return calloc(1, size);
205 thread_msg_free(struct thread_msg_rsp *rsp)
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212 struct thread_msg_req *req)
214 struct thread *t = &thread[thread_id];
215 struct rte_ring *msgq_req = t->msgq_req;
216 struct rte_ring *msgq_rsp = t->msgq_rsp;
217 struct thread_msg_rsp *rsp;
222 status = rte_ring_sp_enqueue(msgq_req, req);
223 } while (status == -ENOBUFS);
227 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228 } while (status != 0);
234 thread_pipeline_enable(uint32_t thread_id,
235 const char *pipeline_name)
237 struct pipeline *p = pipeline_find(pipeline_name);
239 struct thread_msg_req *req;
240 struct thread_msg_rsp *rsp;
244 /* Check input params */
245 if ((thread_id >= RTE_MAX_LCORE) ||
247 (p->n_ports_in == 0) ||
248 (p->n_ports_out == 0) ||
252 t = &thread[thread_id];
253 if ((t->enabled == 0) ||
257 /* Allocate request */
258 req = thread_msg_alloc();
263 req->type = THREAD_REQ_PIPELINE_ENABLE;
264 req->pipeline_enable.p = p->p;
265 for (i = 0; i < p->n_tables; i++)
266 req->pipeline_enable.table[i].a =
268 req->pipeline_enable.msgq_req = p->msgq_req;
269 req->pipeline_enable.msgq_rsp = p->msgq_rsp;
270 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
271 req->pipeline_enable.n_tables = p->n_tables;
273 /* Send request and wait for response */
274 rsp = thread_msg_send_recv(thread_id, req);
279 status = rsp->status;
282 thread_msg_free(rsp);
284 /* Request completion */
288 p->thread_id = thread_id;
295 thread_pipeline_disable(uint32_t thread_id,
296 const char *pipeline_name)
298 struct pipeline *p = pipeline_find(pipeline_name);
300 struct thread_msg_req *req;
301 struct thread_msg_rsp *rsp;
304 /* Check input params */
305 if ((thread_id >= RTE_MAX_LCORE) ||
309 t = &thread[thread_id];
316 if (p->thread_id != thread_id)
319 /* Allocate request */
320 req = thread_msg_alloc();
325 req->type = THREAD_REQ_PIPELINE_DISABLE;
326 req->pipeline_disable.p = p->p;
328 /* Send request and wait for response */
329 rsp = thread_msg_send_recv(thread_id, req);
334 status = rsp->status;
337 thread_msg_free(rsp);
339 /* Request completion */
349 * Data plane threads: message handling
351 static inline struct thread_msg_req *
352 thread_msg_recv(struct rte_ring *msgq_req)
354 struct thread_msg_req *req;
356 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
365 thread_msg_send(struct rte_ring *msgq_rsp,
366 struct thread_msg_rsp *rsp)
371 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
372 } while (status == -ENOBUFS);
375 static struct thread_msg_rsp *
376 thread_msg_handle_pipeline_enable(struct thread_data *t,
377 struct thread_msg_req *req)
379 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
380 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
384 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
389 t->p[t->n_pipelines] = req->pipeline_enable.p;
391 p->p = req->pipeline_enable.p;
392 for (i = 0; i < req->pipeline_enable.n_tables; i++)
394 req->pipeline_enable.table[i].a;
396 p->n_tables = req->pipeline_enable.n_tables;
398 p->msgq_req = req->pipeline_enable.msgq_req;
399 p->msgq_rsp = req->pipeline_enable.msgq_rsp;
401 (rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
402 p->time_next = rte_get_tsc_cycles() + p->timer_period;
411 static struct thread_msg_rsp *
412 thread_msg_handle_pipeline_disable(struct thread_data *t,
413 struct thread_msg_req *req)
415 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
416 uint32_t n_pipelines = t->n_pipelines;
417 struct rte_pipeline *pipeline = req->pipeline_disable.p;
421 for (i = 0; i < n_pipelines; i++) {
422 struct pipeline_data *p = &t->pipeline_data[i];
424 if (p->p != pipeline)
427 if (i < n_pipelines - 1) {
428 struct rte_pipeline *pipeline_last =
429 t->p[n_pipelines - 1];
430 struct pipeline_data *p_last =
431 &t->pipeline_data[n_pipelines - 1];
433 t->p[i] = pipeline_last;
434 memcpy(p, p_last, sizeof(*p));
443 /* should not get here */
449 thread_msg_handle(struct thread_data *t)
452 struct thread_msg_req *req;
453 struct thread_msg_rsp *rsp;
455 req = thread_msg_recv(t->msgq_req);
460 case THREAD_REQ_PIPELINE_ENABLE:
461 rsp = thread_msg_handle_pipeline_enable(t, req);
464 case THREAD_REQ_PIPELINE_DISABLE:
465 rsp = thread_msg_handle_pipeline_disable(t, req);
469 rsp = (struct thread_msg_rsp *) req;
473 thread_msg_send(t->msgq_rsp, rsp);
478 * Master thread & data plane threads: message passing
480 enum pipeline_req_type {
482 PIPELINE_REQ_PORT_IN_ENABLE,
483 PIPELINE_REQ_PORT_IN_DISABLE,
488 struct pipeline_msg_req {
489 enum pipeline_req_type type;
490 uint32_t id; /* Port IN, port OUT or table ID */
493 struct pipeline_msg_rsp {
500 static struct pipeline_msg_req *
501 pipeline_msg_alloc(void)
503 size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
504 sizeof(struct pipeline_msg_rsp));
506 return calloc(1, size);
510 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
515 static struct pipeline_msg_rsp *
516 pipeline_msg_send_recv(struct pipeline *p,
517 struct pipeline_msg_req *req)
519 struct rte_ring *msgq_req = p->msgq_req;
520 struct rte_ring *msgq_rsp = p->msgq_rsp;
521 struct pipeline_msg_rsp *rsp;
526 status = rte_ring_sp_enqueue(msgq_req, req);
527 } while (status == -ENOBUFS);
531 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
532 } while (status != 0);
538 pipeline_port_in_enable(const char *pipeline_name,
542 struct pipeline_msg_req *req;
543 struct pipeline_msg_rsp *rsp;
546 /* Check input params */
547 if (pipeline_name == NULL)
550 p = pipeline_find(pipeline_name);
553 (port_id >= p->n_ports_in))
556 /* Allocate request */
557 req = pipeline_msg_alloc();
562 req->type = PIPELINE_REQ_PORT_IN_ENABLE;
565 /* Send request and wait for response */
566 rsp = pipeline_msg_send_recv(p, req);
571 status = rsp->status;
574 pipeline_msg_free(rsp);
580 pipeline_port_in_disable(const char *pipeline_name,
584 struct pipeline_msg_req *req;
585 struct pipeline_msg_rsp *rsp;
588 /* Check input params */
589 if (pipeline_name == NULL)
592 p = pipeline_find(pipeline_name);
595 (port_id >= p->n_ports_in))
598 /* Allocate request */
599 req = pipeline_msg_alloc();
604 req->type = PIPELINE_REQ_PORT_IN_DISABLE;
607 /* Send request and wait for response */
608 rsp = pipeline_msg_send_recv(p, req);
613 status = rsp->status;
616 pipeline_msg_free(rsp);
623 * Data plane threads: message handling
625 static inline struct pipeline_msg_req *
626 pipeline_msg_recv(struct rte_ring *msgq_req)
628 struct pipeline_msg_req *req;
630 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
639 pipeline_msg_send(struct rte_ring *msgq_rsp,
640 struct pipeline_msg_rsp *rsp)
645 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
646 } while (status == -ENOBUFS);
649 static struct pipeline_msg_rsp *
650 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
651 struct pipeline_msg_req *req)
653 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
654 uint32_t port_id = req->id;
656 rsp->status = rte_pipeline_port_in_enable(p->p,
662 static struct pipeline_msg_rsp *
663 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
664 struct pipeline_msg_req *req)
666 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
667 uint32_t port_id = req->id;
669 rsp->status = rte_pipeline_port_in_disable(p->p,
676 pipeline_msg_handle(struct pipeline_data *p)
679 struct pipeline_msg_req *req;
680 struct pipeline_msg_rsp *rsp;
682 req = pipeline_msg_recv(p->msgq_req);
687 case PIPELINE_REQ_PORT_IN_ENABLE:
688 rsp = pipeline_msg_handle_port_in_enable(p, req);
691 case PIPELINE_REQ_PORT_IN_DISABLE:
692 rsp = pipeline_msg_handle_port_in_disable(p, req);
696 rsp = (struct pipeline_msg_rsp *) req;
700 pipeline_msg_send(p->msgq_rsp, rsp);
705 * Data plane threads: main
708 thread_main(void *arg __rte_unused)
710 struct thread_data *t;
711 uint32_t thread_id, i;
713 thread_id = rte_lcore_id();
714 t = &thread_data[thread_id];
721 for (j = 0; j < t->n_pipelines; j++)
722 rte_pipeline_run(t->p[j]);
725 if ((i & 0xF) == 0) {
726 uint64_t time = rte_get_tsc_cycles();
727 uint64_t time_next_min = UINT64_MAX;
729 if (time < t->time_next_min)
732 /* Pipeline message queues */
733 for (j = 0; j < t->n_pipelines; j++) {
734 struct pipeline_data *p =
735 &t->pipeline_data[j];
736 uint64_t time_next = p->time_next;
738 if (time_next <= time) {
739 pipeline_msg_handle(p);
740 rte_pipeline_flush(p->p);
741 time_next = time + p->timer_period;
742 p->time_next = time_next;
745 if (time_next < time_next_min)
746 time_next_min = time_next;
749 /* Thread message queues */
751 uint64_t time_next = t->time_next;
753 if (time_next <= time) {
754 thread_msg_handle(t);
755 time_next = time + t->timer_period;
756 t->time_next = time_next;
759 if (time_next < time_next_min)
760 time_next_min = time_next;
763 t->time_next_min = time_next_min;