examples/ip_pipeline: add thread runtime
authorJasvinder Singh <jasvinder.singh@intel.com>
Thu, 29 Mar 2018 18:31:50 +0000 (19:31 +0100)
committerCristian Dumitrescu <cristian.dumitrescu@intel.com>
Thu, 5 Apr 2018 17:00:17 +0000 (19:00 +0200)
Add runtime thread functions for the pipeline.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
Signed-off-by: Jasvinder Singh <jasvinder.singh@intel.com>
examples/ip_pipeline/main.c
examples/ip_pipeline/thread.c
examples/ip_pipeline/thread.h

index 45f0739..a69face 100644 (file)
@@ -8,6 +8,7 @@
 #include <unistd.h>
 #include <getopt.h>
 
+#include <rte_launch.h>
 #include <rte_eal.h>
 
 #include "cli.h"
@@ -237,6 +238,11 @@ main(int argc, char **argv)
                return status;
        }
 
+       rte_eal_mp_remote_launch(
+               thread_main,
+               NULL,
+               SKIP_MASTER);
+
        /* Script */
        if (app.script_name)
                cli_script_process(app.script_name,
index 2c8f84c..805a2ae 100644 (file)
@@ -154,3 +154,196 @@ thread_init(void)
 
        return 0;
 }
+
+/**
+ * Master thread & data plane threads: message passing
+ */
+enum thread_req_type {
+       THREAD_REQ_MAX
+};
+
+struct thread_msg_req {
+       enum thread_req_type type;
+};
+
+struct thread_msg_rsp {
+       int status;
+};
+
+/**
+ * Data plane threads: message handling
+ */
+static inline struct thread_msg_req *
+thread_msg_recv(struct rte_ring *msgq_req)
+{
+       struct thread_msg_req *req;
+
+       int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
+
+       if (status != 0)
+               return NULL;
+
+       return req;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *msgq_rsp,
+       struct thread_msg_rsp *rsp)
+{
+       int status;
+
+       do {
+               status = rte_ring_sp_enqueue(msgq_rsp, rsp);
+       } while (status == -ENOBUFS);
+}
+
+static void
+thread_msg_handle(struct thread_data *t)
+{
+       for ( ; ; ) {
+               struct thread_msg_req *req;
+               struct thread_msg_rsp *rsp;
+
+               req = thread_msg_recv(t->msgq_req);
+               if (req == NULL)
+                       break;
+
+               switch (req->type) {
+               default:
+                       rsp = (struct thread_msg_rsp *) req;
+                       rsp->status = -1;
+               }
+
+               thread_msg_send(t->msgq_rsp, rsp);
+       }
+}
+
+/**
+ * Master thread & data plane threads: message passing
+ */
+
+enum pipeline_req_type {
+       PIPELINE_REQ_MAX
+};
+
+struct pipeline_msg_req {
+       enum pipeline_req_type type;
+};
+
+struct pipeline_msg_rsp {
+       int status;
+};
+
+/**
+ * Data plane threads: message handling
+ */
+static inline struct pipeline_msg_req *
+pipeline_msg_recv(struct rte_ring *msgq_req)
+{
+       struct pipeline_msg_req *req;
+
+       int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
+
+       if (status != 0)
+               return NULL;
+
+       return req;
+}
+
+static inline void
+pipeline_msg_send(struct rte_ring *msgq_rsp,
+       struct pipeline_msg_rsp *rsp)
+{
+       int status;
+
+       do {
+               status = rte_ring_sp_enqueue(msgq_rsp, rsp);
+       } while (status == -ENOBUFS);
+}
+
+static void
+pipeline_msg_handle(struct pipeline_data *p)
+{
+       for ( ; ; ) {
+               struct pipeline_msg_req *req;
+               struct pipeline_msg_rsp *rsp;
+
+               req = pipeline_msg_recv(p->msgq_req);
+               if (req == NULL)
+                       break;
+
+               switch (req->type) {
+               default:
+                       rsp = (struct pipeline_msg_rsp *) req;
+                       rsp->status = -1;
+               }
+
+               pipeline_msg_send(p->msgq_rsp, rsp);
+       }
+}
+
+/**
+ * Data plane threads: main
+ */
+int
+thread_main(void *arg __rte_unused)
+{
+       struct thread_data *t;
+       uint32_t thread_id, i;
+
+       thread_id = rte_lcore_id();
+       t = &thread_data[thread_id];
+
+       /* Dispatch loop */
+       for (i = 0; ; i++) {
+               uint32_t j;
+
+               /* Data Plane */
+               for (j = 0; j < t->n_pipelines; j++)
+                       rte_pipeline_run(t->p[j]);
+
+               /* Control Plane */
+               if ((i & 0xF) == 0) {
+                       uint64_t time = rte_get_tsc_cycles();
+                       uint64_t time_next_min = UINT64_MAX;
+
+                       if (time < t->time_next_min)
+                               continue;
+
+                       /* Pipeline message queues */
+                       for (j = 0; j < t->n_pipelines; j++) {
+                               struct pipeline_data *p =
+                                       &t->pipeline_data[j];
+                               uint64_t time_next = p->time_next;
+
+                               if (time_next <= time) {
+                                       pipeline_msg_handle(p);
+                                       rte_pipeline_flush(p->p);
+                                       time_next = time + p->timer_period;
+                                       p->time_next = time_next;
+                               }
+
+                               if (time_next < time_next_min)
+                                       time_next_min = time_next;
+                       }
+
+                       /* Thread message queues */
+                       {
+                               uint64_t time_next = t->time_next;
+
+                               if (time_next <= time) {
+                                       thread_msg_handle(t);
+                                       time_next = time + t->timer_period;
+                                       t->time_next = time_next;
+                               }
+
+                               if (time_next < time_next_min)
+                                       time_next_min = time_next;
+                       }
+
+                       t->time_next_min = time_next_min;
+               }
+       }
+
+       return 0;
+}
index 39c0d89..47db428 100644 (file)
@@ -10,4 +10,7 @@
 int
 thread_init(void);
 
+int
+thread_main(void *arg);
+
 #endif /* _INCLUDE_THREAD_H_ */