1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2018 Intel Corporation
7 #include <rte_common.h>
8 #include <rte_cycles.h>
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX 256
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE 64
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS 100
35 * Master thead: data plane thread context
38 struct rte_ring *msgq_req;
39 struct rte_ring *msgq_rsp;
44 static struct thread thread[RTE_MAX_LCORE];
47 * Data plane threads: context
50 struct rte_table_action *a;
53 struct pipeline_data {
54 struct rte_pipeline *p;
55 struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
58 struct rte_ring *msgq_req;
59 struct rte_ring *msgq_rsp;
60 uint64_t timer_period; /* Measured in CPU cycles. */
63 uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
67 struct rte_pipeline *p[THREAD_PIPELINES_MAX];
70 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 struct rte_ring *msgq_req;
72 struct rte_ring *msgq_rsp;
73 uint64_t timer_period; /* Measured in CPU cycles. */
75 uint64_t time_next_min;
76 } __rte_cache_aligned;
78 static struct thread_data thread_data[RTE_MAX_LCORE];
81 * Master thread: data plane thread init
88 for (i = 0; i < RTE_MAX_LCORE; i++) {
89 struct thread *t = &thread[i];
91 if (!rte_lcore_is_enabled(i))
96 rte_ring_free(t->msgq_req);
99 rte_ring_free(t->msgq_rsp);
108 RTE_LCORE_FOREACH_SLAVE(i) {
110 struct rte_ring *msgq_req, *msgq_rsp;
111 struct thread *t = &thread[i];
112 struct thread_data *t_data = &thread_data[i];
113 uint32_t cpu_id = rte_lcore_to_socket_id(i);
116 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
118 msgq_req = rte_ring_create(name,
121 RING_F_SP_ENQ | RING_F_SC_DEQ);
123 if (msgq_req == NULL) {
128 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
130 msgq_rsp = rte_ring_create(name,
133 RING_F_SP_ENQ | RING_F_SC_DEQ);
135 if (msgq_rsp == NULL) {
140 /* Master thread records */
141 t->msgq_req = msgq_req;
142 t->msgq_rsp = msgq_rsp;
145 /* Data plane thread records */
146 t_data->n_pipelines = 0;
147 t_data->msgq_req = msgq_req;
148 t_data->msgq_rsp = msgq_rsp;
149 t_data->timer_period =
150 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 t_data->time_next_min = t_data->time_next;
159 * Master thread & data plane threads: message passing
161 enum thread_req_type {
162 THREAD_REQ_PIPELINE_ENABLE = 0,
163 THREAD_REQ_PIPELINE_DISABLE,
167 struct thread_msg_req {
168 enum thread_req_type type;
172 struct rte_pipeline *p;
174 struct rte_table_action *a;
175 } table[RTE_PIPELINE_TABLE_MAX];
176 struct rte_ring *msgq_req;
177 struct rte_ring *msgq_rsp;
178 uint32_t timer_period_ms;
183 struct rte_pipeline *p;
188 struct thread_msg_rsp {
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
198 size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199 sizeof(struct thread_msg_rsp));
201 return calloc(1, size);
205 thread_msg_free(struct thread_msg_rsp *rsp)
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212 struct thread_msg_req *req)
214 struct thread *t = &thread[thread_id];
215 struct rte_ring *msgq_req = t->msgq_req;
216 struct rte_ring *msgq_rsp = t->msgq_rsp;
217 struct thread_msg_rsp *rsp;
222 status = rte_ring_sp_enqueue(msgq_req, req);
223 } while (status == -ENOBUFS);
227 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228 } while (status != 0);
234 thread_pipeline_enable(uint32_t thread_id,
235 const char *pipeline_name)
237 struct pipeline *p = pipeline_find(pipeline_name);
239 struct thread_msg_req *req;
240 struct thread_msg_rsp *rsp;
244 /* Check input params */
245 if ((thread_id >= RTE_MAX_LCORE) ||
247 (p->n_ports_in == 0) ||
248 (p->n_ports_out == 0) ||
252 t = &thread[thread_id];
253 if ((t->enabled == 0) ||
257 /* Allocate request */
258 req = thread_msg_alloc();
263 req->type = THREAD_REQ_PIPELINE_ENABLE;
264 req->pipeline_enable.p = p->p;
265 for (i = 0; i < p->n_tables; i++)
266 req->pipeline_enable.table[i].a =
268 req->pipeline_enable.msgq_req = p->msgq_req;
269 req->pipeline_enable.msgq_rsp = p->msgq_rsp;
270 req->pipeline_enable.timer_period_ms = p->timer_period_ms;
271 req->pipeline_enable.n_tables = p->n_tables;
273 /* Send request and wait for response */
274 rsp = thread_msg_send_recv(thread_id, req);
279 status = rsp->status;
282 thread_msg_free(rsp);
284 /* Request completion */
288 p->thread_id = thread_id;
295 thread_pipeline_disable(uint32_t thread_id,
296 const char *pipeline_name)
298 struct pipeline *p = pipeline_find(pipeline_name);
300 struct thread_msg_req *req;
301 struct thread_msg_rsp *rsp;
304 /* Check input params */
305 if ((thread_id >= RTE_MAX_LCORE) ||
309 t = &thread[thread_id];
316 if (p->thread_id != thread_id)
319 /* Allocate request */
320 req = thread_msg_alloc();
325 req->type = THREAD_REQ_PIPELINE_DISABLE;
326 req->pipeline_disable.p = p->p;
328 /* Send request and wait for response */
329 rsp = thread_msg_send_recv(thread_id, req);
334 status = rsp->status;
337 thread_msg_free(rsp);
339 /* Request completion */
349 * Data plane threads: message handling
351 static inline struct thread_msg_req *
352 thread_msg_recv(struct rte_ring *msgq_req)
354 struct thread_msg_req *req;
356 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
365 thread_msg_send(struct rte_ring *msgq_rsp,
366 struct thread_msg_rsp *rsp)
371 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
372 } while (status == -ENOBUFS);
375 static struct thread_msg_rsp *
376 thread_msg_handle_pipeline_enable(struct thread_data *t,
377 struct thread_msg_req *req)
379 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
380 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
384 if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
389 t->p[t->n_pipelines] = req->pipeline_enable.p;
391 p->p = req->pipeline_enable.p;
392 for (i = 0; i < req->pipeline_enable.n_tables; i++)
394 req->pipeline_enable.table[i].a;
396 p->n_tables = req->pipeline_enable.n_tables;
398 p->msgq_req = req->pipeline_enable.msgq_req;
399 p->msgq_rsp = req->pipeline_enable.msgq_rsp;
401 (rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
402 p->time_next = rte_get_tsc_cycles() + p->timer_period;
411 static struct thread_msg_rsp *
412 thread_msg_handle_pipeline_disable(struct thread_data *t,
413 struct thread_msg_req *req)
415 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
416 uint32_t n_pipelines = t->n_pipelines;
417 struct rte_pipeline *pipeline = req->pipeline_disable.p;
421 for (i = 0; i < n_pipelines; i++) {
422 struct pipeline_data *p = &t->pipeline_data[i];
424 if (p->p != pipeline)
427 if (i < n_pipelines - 1) {
428 struct rte_pipeline *pipeline_last =
429 t->p[n_pipelines - 1];
430 struct pipeline_data *p_last =
431 &t->pipeline_data[n_pipelines - 1];
433 t->p[i] = pipeline_last;
434 memcpy(p, p_last, sizeof(*p));
443 /* should not get here */
449 thread_msg_handle(struct thread_data *t)
452 struct thread_msg_req *req;
453 struct thread_msg_rsp *rsp;
455 req = thread_msg_recv(t->msgq_req);
460 case THREAD_REQ_PIPELINE_ENABLE:
461 rsp = thread_msg_handle_pipeline_enable(t, req);
464 case THREAD_REQ_PIPELINE_DISABLE:
465 rsp = thread_msg_handle_pipeline_disable(t, req);
469 rsp = (struct thread_msg_rsp *) req;
473 thread_msg_send(t->msgq_rsp, rsp);
478 * Master thread & data plane threads: message passing
480 enum pipeline_req_type {
482 PIPELINE_REQ_PORT_IN_STATS_READ,
483 PIPELINE_REQ_PORT_IN_ENABLE,
484 PIPELINE_REQ_PORT_IN_DISABLE,
487 PIPELINE_REQ_PORT_OUT_STATS_READ,
490 PIPELINE_REQ_TABLE_STATS_READ,
495 struct pipeline_msg_req_port_in_stats_read {
499 struct pipeline_msg_req_port_out_stats_read {
503 struct pipeline_msg_req_table_stats_read {
507 struct pipeline_msg_req {
508 enum pipeline_req_type type;
509 uint32_t id; /* Port IN, port OUT or table ID */
513 struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
514 struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
515 struct pipeline_msg_req_table_stats_read table_stats_read;
519 struct pipeline_msg_rsp_port_in_stats_read {
520 struct rte_pipeline_port_in_stats stats;
523 struct pipeline_msg_rsp_port_out_stats_read {
524 struct rte_pipeline_port_out_stats stats;
527 struct pipeline_msg_rsp_table_stats_read {
528 struct rte_pipeline_table_stats stats;
531 struct pipeline_msg_rsp {
536 struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
537 struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
538 struct pipeline_msg_rsp_table_stats_read table_stats_read;
545 static struct pipeline_msg_req *
546 pipeline_msg_alloc(void)
548 size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
549 sizeof(struct pipeline_msg_rsp));
551 return calloc(1, size);
555 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
560 static struct pipeline_msg_rsp *
561 pipeline_msg_send_recv(struct pipeline *p,
562 struct pipeline_msg_req *req)
564 struct rte_ring *msgq_req = p->msgq_req;
565 struct rte_ring *msgq_rsp = p->msgq_rsp;
566 struct pipeline_msg_rsp *rsp;
571 status = rte_ring_sp_enqueue(msgq_req, req);
572 } while (status == -ENOBUFS);
576 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
577 } while (status != 0);
583 pipeline_port_in_stats_read(const char *pipeline_name,
585 struct rte_pipeline_port_in_stats *stats,
589 struct pipeline_msg_req *req;
590 struct pipeline_msg_rsp *rsp;
593 /* Check input params */
594 if ((pipeline_name == NULL) ||
598 p = pipeline_find(pipeline_name);
601 (port_id >= p->n_ports_in))
604 /* Allocate request */
605 req = pipeline_msg_alloc();
610 req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
612 req->port_in_stats_read.clear = clear;
614 /* Send request and wait for response */
615 rsp = pipeline_msg_send_recv(p, req);
620 status = rsp->status;
622 memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
625 pipeline_msg_free(rsp);
631 pipeline_port_in_enable(const char *pipeline_name,
635 struct pipeline_msg_req *req;
636 struct pipeline_msg_rsp *rsp;
639 /* Check input params */
640 if (pipeline_name == NULL)
643 p = pipeline_find(pipeline_name);
646 (port_id >= p->n_ports_in))
649 /* Allocate request */
650 req = pipeline_msg_alloc();
655 req->type = PIPELINE_REQ_PORT_IN_ENABLE;
658 /* Send request and wait for response */
659 rsp = pipeline_msg_send_recv(p, req);
664 status = rsp->status;
667 pipeline_msg_free(rsp);
673 pipeline_port_in_disable(const char *pipeline_name,
677 struct pipeline_msg_req *req;
678 struct pipeline_msg_rsp *rsp;
681 /* Check input params */
682 if (pipeline_name == NULL)
685 p = pipeline_find(pipeline_name);
688 (port_id >= p->n_ports_in))
691 /* Allocate request */
692 req = pipeline_msg_alloc();
697 req->type = PIPELINE_REQ_PORT_IN_DISABLE;
700 /* Send request and wait for response */
701 rsp = pipeline_msg_send_recv(p, req);
706 status = rsp->status;
709 pipeline_msg_free(rsp);
715 pipeline_port_out_stats_read(const char *pipeline_name,
717 struct rte_pipeline_port_out_stats *stats,
721 struct pipeline_msg_req *req;
722 struct pipeline_msg_rsp *rsp;
725 /* Check input params */
726 if ((pipeline_name == NULL) ||
730 p = pipeline_find(pipeline_name);
733 (port_id >= p->n_ports_out))
736 /* Allocate request */
737 req = pipeline_msg_alloc();
742 req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
744 req->port_out_stats_read.clear = clear;
746 /* Send request and wait for response */
747 rsp = pipeline_msg_send_recv(p, req);
752 status = rsp->status;
754 memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
757 pipeline_msg_free(rsp);
763 pipeline_table_stats_read(const char *pipeline_name,
765 struct rte_pipeline_table_stats *stats,
769 struct pipeline_msg_req *req;
770 struct pipeline_msg_rsp *rsp;
773 /* Check input params */
774 if ((pipeline_name == NULL) ||
778 p = pipeline_find(pipeline_name);
781 (table_id >= p->n_tables))
784 /* Allocate request */
785 req = pipeline_msg_alloc();
790 req->type = PIPELINE_REQ_TABLE_STATS_READ;
792 req->table_stats_read.clear = clear;
794 /* Send request and wait for response */
795 rsp = pipeline_msg_send_recv(p, req);
800 status = rsp->status;
802 memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
805 pipeline_msg_free(rsp);
811 * Data plane threads: message handling
813 static inline struct pipeline_msg_req *
814 pipeline_msg_recv(struct rte_ring *msgq_req)
816 struct pipeline_msg_req *req;
818 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
827 pipeline_msg_send(struct rte_ring *msgq_rsp,
828 struct pipeline_msg_rsp *rsp)
833 status = rte_ring_sp_enqueue(msgq_rsp, rsp);
834 } while (status == -ENOBUFS);
837 static struct pipeline_msg_rsp *
838 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
839 struct pipeline_msg_req *req)
841 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
842 uint32_t port_id = req->id;
843 int clear = req->port_in_stats_read.clear;
845 rsp->status = rte_pipeline_port_in_stats_read(p->p,
847 &rsp->port_in_stats_read.stats,
853 static struct pipeline_msg_rsp *
854 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
855 struct pipeline_msg_req *req)
857 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
858 uint32_t port_id = req->id;
860 rsp->status = rte_pipeline_port_in_enable(p->p,
866 static struct pipeline_msg_rsp *
867 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
868 struct pipeline_msg_req *req)
870 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
871 uint32_t port_id = req->id;
873 rsp->status = rte_pipeline_port_in_disable(p->p,
879 static struct pipeline_msg_rsp *
880 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
881 struct pipeline_msg_req *req)
883 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
884 uint32_t port_id = req->id;
885 int clear = req->port_out_stats_read.clear;
887 rsp->status = rte_pipeline_port_out_stats_read(p->p,
889 &rsp->port_out_stats_read.stats,
895 static struct pipeline_msg_rsp *
896 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
897 struct pipeline_msg_req *req)
899 struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
900 uint32_t port_id = req->id;
901 int clear = req->table_stats_read.clear;
903 rsp->status = rte_pipeline_table_stats_read(p->p,
905 &rsp->table_stats_read.stats,
912 pipeline_msg_handle(struct pipeline_data *p)
915 struct pipeline_msg_req *req;
916 struct pipeline_msg_rsp *rsp;
918 req = pipeline_msg_recv(p->msgq_req);
923 case PIPELINE_REQ_PORT_IN_STATS_READ:
924 rsp = pipeline_msg_handle_port_in_stats_read(p, req);
927 case PIPELINE_REQ_PORT_IN_ENABLE:
928 rsp = pipeline_msg_handle_port_in_enable(p, req);
931 case PIPELINE_REQ_PORT_IN_DISABLE:
932 rsp = pipeline_msg_handle_port_in_disable(p, req);
935 case PIPELINE_REQ_PORT_OUT_STATS_READ:
936 rsp = pipeline_msg_handle_port_out_stats_read(p, req);
939 case PIPELINE_REQ_TABLE_STATS_READ:
940 rsp = pipeline_msg_handle_table_stats_read(p, req);
945 rsp = (struct pipeline_msg_rsp *) req;
949 pipeline_msg_send(p->msgq_rsp, rsp);
954 * Data plane threads: main
957 thread_main(void *arg __rte_unused)
959 struct thread_data *t;
960 uint32_t thread_id, i;
962 thread_id = rte_lcore_id();
963 t = &thread_data[thread_id];
970 for (j = 0; j < t->n_pipelines; j++)
971 rte_pipeline_run(t->p[j]);
974 if ((i & 0xF) == 0) {
975 uint64_t time = rte_get_tsc_cycles();
976 uint64_t time_next_min = UINT64_MAX;
978 if (time < t->time_next_min)
981 /* Pipeline message queues */
982 for (j = 0; j < t->n_pipelines; j++) {
983 struct pipeline_data *p =
984 &t->pipeline_data[j];
985 uint64_t time_next = p->time_next;
987 if (time_next <= time) {
988 pipeline_msg_handle(p);
989 rte_pipeline_flush(p->p);
990 time_next = time + p->timer_period;
991 p->time_next = time_next;
994 if (time_next < time_next_min)
995 time_next_min = time_next;
998 /* Thread message queues */
1000 uint64_t time_next = t->time_next;
1002 if (time_next <= time) {
1003 thread_msg_handle(t);
1004 time_next = time + t->timer_period;
1005 t->time_next = time_next;
1008 if (time_next < time_next_min)
1009 time_next_min = time_next;
1012 t->time_next_min = time_next_min;