return 0;
 }
+
+/**
+ * Master thread & data plane threads: message passing
+ */
+enum thread_req_type {
+       THREAD_REQ_MAX
+};
+
+struct thread_msg_req {
+       enum thread_req_type type;
+};
+
+struct thread_msg_rsp {
+       int status;
+};
+
+/**
+ * Data plane threads: message handling
+ */
+static inline struct thread_msg_req *
+thread_msg_recv(struct rte_ring *msgq_req)
+{
+       struct thread_msg_req *req;
+
+       int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
+
+       if (status != 0)
+               return NULL;
+
+       return req;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *msgq_rsp,
+       struct thread_msg_rsp *rsp)
+{
+       int status;
+
+       do {
+               status = rte_ring_sp_enqueue(msgq_rsp, rsp);
+       } while (status == -ENOBUFS);
+}
+
+static void
+thread_msg_handle(struct thread_data *t)
+{
+       for ( ; ; ) {
+               struct thread_msg_req *req;
+               struct thread_msg_rsp *rsp;
+
+               req = thread_msg_recv(t->msgq_req);
+               if (req == NULL)
+                       break;
+
+               switch (req->type) {
+               default:
+                       rsp = (struct thread_msg_rsp *) req;
+                       rsp->status = -1;
+               }
+
+               thread_msg_send(t->msgq_rsp, rsp);
+       }
+}
+
+/**
+ * Master thread & data plane threads: message passing
+ */
+
+enum pipeline_req_type {
+       PIPELINE_REQ_MAX
+};
+
+struct pipeline_msg_req {
+       enum pipeline_req_type type;
+};
+
+struct pipeline_msg_rsp {
+       int status;
+};
+
+/**
+ * Data plane threads: message handling
+ */
+static inline struct pipeline_msg_req *
+pipeline_msg_recv(struct rte_ring *msgq_req)
+{
+       struct pipeline_msg_req *req;
+
+       int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
+
+       if (status != 0)
+               return NULL;
+
+       return req;
+}
+
+static inline void
+pipeline_msg_send(struct rte_ring *msgq_rsp,
+       struct pipeline_msg_rsp *rsp)
+{
+       int status;
+
+       do {
+               status = rte_ring_sp_enqueue(msgq_rsp, rsp);
+       } while (status == -ENOBUFS);
+}
+
+static void
+pipeline_msg_handle(struct pipeline_data *p)
+{
+       for ( ; ; ) {
+               struct pipeline_msg_req *req;
+               struct pipeline_msg_rsp *rsp;
+
+               req = pipeline_msg_recv(p->msgq_req);
+               if (req == NULL)
+                       break;
+
+               switch (req->type) {
+               default:
+                       rsp = (struct pipeline_msg_rsp *) req;
+                       rsp->status = -1;
+               }
+
+               pipeline_msg_send(p->msgq_rsp, rsp);
+       }
+}
+
+/**
+ * Data plane threads: main
+ */
+int
+thread_main(void *arg __rte_unused)
+{
+       struct thread_data *t;
+       uint32_t thread_id, i;
+
+       thread_id = rte_lcore_id();
+       t = &thread_data[thread_id];
+
+       /* Dispatch loop */
+       for (i = 0; ; i++) {
+               uint32_t j;
+
+               /* Data Plane */
+               for (j = 0; j < t->n_pipelines; j++)
+                       rte_pipeline_run(t->p[j]);
+
+               /* Control Plane */
+               if ((i & 0xF) == 0) {
+                       uint64_t time = rte_get_tsc_cycles();
+                       uint64_t time_next_min = UINT64_MAX;
+
+                       if (time < t->time_next_min)
+                               continue;
+
+                       /* Pipeline message queues */
+                       for (j = 0; j < t->n_pipelines; j++) {
+                               struct pipeline_data *p =
+                                       &t->pipeline_data[j];
+                               uint64_t time_next = p->time_next;
+
+                               if (time_next <= time) {
+                                       pipeline_msg_handle(p);
+                                       rte_pipeline_flush(p->p);
+                                       time_next = time + p->timer_period;
+                                       p->time_next = time_next;
+                               }
+
+                               if (time_next < time_next_min)
+                                       time_next_min = time_next;
+                       }
+
+                       /* Thread message queues */
+                       {
+                               uint64_t time_next = t->time_next;
+
+                               if (time_next <= time) {
+                                       thread_msg_handle(t);
+                                       time_next = time + t->timer_period;
+                                       t->time_next = time_next;
+                               }
+
+                               if (time_next < time_next_min)
+                                       time_next_min = time_next;
+                       }
+
+                       t->time_next_min = time_next_min;
+               }
+       }
+
+       return 0;
+}