From: Jasvinder Singh Date: Thu, 29 Mar 2018 18:31:50 +0000 (+0100) Subject: examples/ip_pipeline: add thread runtime X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=a8bd581de3975b5d7ad95afd227e5d889765680e;p=dpdk.git examples/ip_pipeline: add thread runtime Add runtime thread functions for the pipeline. Signed-off-by: Cristian Dumitrescu Signed-off-by: Jasvinder Singh --- diff --git a/examples/ip_pipeline/main.c b/examples/ip_pipeline/main.c index 45f0739736..a69faceef7 100644 --- a/examples/ip_pipeline/main.c +++ b/examples/ip_pipeline/main.c @@ -8,6 +8,7 @@ #include #include +#include #include #include "cli.h" @@ -237,6 +238,11 @@ main(int argc, char **argv) return status; } + rte_eal_mp_remote_launch( + thread_main, + NULL, + SKIP_MASTER); + /* Script */ if (app.script_name) cli_script_process(app.script_name, diff --git a/examples/ip_pipeline/thread.c b/examples/ip_pipeline/thread.c index 2c8f84c099..805a2ae90c 100644 --- a/examples/ip_pipeline/thread.c +++ b/examples/ip_pipeline/thread.c @@ -154,3 +154,196 @@ thread_init(void) return 0; } + +/** + * Master thread & data plane threads: message passing + */ +enum thread_req_type { + THREAD_REQ_MAX +}; + +struct thread_msg_req { + enum thread_req_type type; +}; + +struct thread_msg_rsp { + int status; +}; + +/** + * Data plane threads: message handling + */ +static inline struct thread_msg_req * +thread_msg_recv(struct rte_ring *msgq_req) +{ + struct thread_msg_req *req; + + int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); + + if (status != 0) + return NULL; + + return req; +} + +static inline void +thread_msg_send(struct rte_ring *msgq_rsp, + struct thread_msg_rsp *rsp) +{ + int status; + + do { + status = rte_ring_sp_enqueue(msgq_rsp, rsp); + } while (status == -ENOBUFS); +} + +static void +thread_msg_handle(struct thread_data *t) +{ + for ( ; ; ) { + struct thread_msg_req *req; + struct thread_msg_rsp *rsp; + + req = thread_msg_recv(t->msgq_req); + if (req == NULL) + break; + + switch (req->type) { + default: + rsp = (struct thread_msg_rsp *) req; + rsp->status = -1; + } + + thread_msg_send(t->msgq_rsp, rsp); + } +} + +/** + * Master thread & data plane threads: message passing + */ + +enum pipeline_req_type { + PIPELINE_REQ_MAX +}; + +struct pipeline_msg_req { + enum pipeline_req_type type; +}; + +struct pipeline_msg_rsp { + int status; +}; + +/** + * Data plane threads: message handling + */ +static inline struct pipeline_msg_req * +pipeline_msg_recv(struct rte_ring *msgq_req) +{ + struct pipeline_msg_req *req; + + int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); + + if (status != 0) + return NULL; + + return req; +} + +static inline void +pipeline_msg_send(struct rte_ring *msgq_rsp, + struct pipeline_msg_rsp *rsp) +{ + int status; + + do { + status = rte_ring_sp_enqueue(msgq_rsp, rsp); + } while (status == -ENOBUFS); +} + +static void +pipeline_msg_handle(struct pipeline_data *p) +{ + for ( ; ; ) { + struct pipeline_msg_req *req; + struct pipeline_msg_rsp *rsp; + + req = pipeline_msg_recv(p->msgq_req); + if (req == NULL) + break; + + switch (req->type) { + default: + rsp = (struct pipeline_msg_rsp *) req; + rsp->status = -1; + } + + pipeline_msg_send(p->msgq_rsp, rsp); + } +} + +/** + * Data plane threads: main + */ +int +thread_main(void *arg __rte_unused) +{ + struct thread_data *t; + uint32_t thread_id, i; + + thread_id = rte_lcore_id(); + t = &thread_data[thread_id]; + + /* Dispatch loop */ + for (i = 0; ; i++) { + uint32_t j; + + /* Data Plane */ + for (j = 0; j < t->n_pipelines; j++) + rte_pipeline_run(t->p[j]); + + /* Control Plane */ + if ((i & 0xF) == 0) { + uint64_t time = rte_get_tsc_cycles(); + uint64_t time_next_min = UINT64_MAX; + + if (time < t->time_next_min) + continue; + + /* Pipeline message queues */ + for (j = 0; j < t->n_pipelines; j++) { + struct pipeline_data *p = + &t->pipeline_data[j]; + uint64_t time_next = p->time_next; + + if (time_next <= time) { + pipeline_msg_handle(p); + rte_pipeline_flush(p->p); + time_next = time + p->timer_period; + p->time_next = time_next; + } + + if (time_next < time_next_min) + time_next_min = time_next; + } + + /* Thread message queues */ + { + uint64_t time_next = t->time_next; + + if (time_next <= time) { + thread_msg_handle(t); + time_next = time + t->timer_period; + t->time_next = time_next; + } + + if (time_next < time_next_min) + time_next_min = time_next; + } + + t->time_next_min = time_next_min; + } + } + + return 0; +} diff --git a/examples/ip_pipeline/thread.h b/examples/ip_pipeline/thread.h index 39c0d8959a..47db428806 100644 --- a/examples/ip_pipeline/thread.h +++ b/examples/ip_pipeline/thread.h @@ -10,4 +10,7 @@ int thread_init(void); +int +thread_main(void *arg); + #endif /* _INCLUDE_THREAD_H_ */