#include "pipeline_common_be.h"
#include "app.h"
+#include "thread.h"
-int app_thread(void *arg)
+static inline void *
+thread_msg_recv(struct rte_ring *r)
+{
+ void *msg;
+ int status = rte_ring_sc_dequeue(r, &msg);
+
+ if (status != 0)
+ return NULL;
+
+ return msg;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *r,
+ void *msg)
+{
+ int status;
+
+ do {
+ status = rte_ring_sp_enqueue(r, msg);
+ } while (status == -ENOBUFS);
+}
+
+static int
+thread_pipeline_enable(struct app_thread_data *t,
+ struct thread_pipeline_enable_msg_req *req)
+{
+ struct app_thread_pipeline_data *p;
+
+ if (req->f_run == NULL) {
+ if (t->n_regular >= APP_MAX_THREAD_PIPELINES)
+ return -1;
+ } else {
+ if (t->n_custom >= APP_MAX_THREAD_PIPELINES)
+ return -1;
+ }
+
+ p = (req->f_run == NULL) ?
+ &t->regular[t->n_regular] :
+ &t->custom[t->n_custom];
+
+ p->pipeline_id = req->pipeline_id;
+ p->be = req->be;
+ p->f_run = req->f_run;
+ p->f_timer = req->f_timer;
+ p->timer_period = req->timer_period;
+ p->deadline = 0;
+
+ if (req->f_run == NULL)
+ t->n_regular++;
+ else
+ t->n_custom++;
+
+ return 0;
+}
+
+static int
+thread_pipeline_disable(struct app_thread_data *t,
+ struct thread_pipeline_disable_msg_req *req)
+{
+ uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
+ uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
+ uint32_t i;
+
+ /* search regular pipelines of current thread */
+ for (i = 0; i < n_regular; i++) {
+ if (t->regular[i].pipeline_id != req->pipeline_id)
+ continue;
+
+ if (i < n_regular - 1)
+ memcpy(&t->regular[i],
+ &t->regular[i+1],
+ (n_regular - 1 - i) * sizeof(struct app_thread_pipeline_data));
+
+ n_regular--;
+ t->n_regular = n_regular;
+
+ return 0;
+ }
+
+ /* search custom pipelines of current thread */
+ for (i = 0; i < n_custom; i++) {
+ if (t->custom[i].pipeline_id != req->pipeline_id)
+ continue;
+
+ if (i < n_custom - 1)
+ memcpy(&t->custom[i],
+ &t->custom[i+1],
+ (n_custom - 1 - i) * sizeof(struct app_thread_pipeline_data));
+
+ n_custom--;
+ t->n_custom = n_custom;
+
+ return 0;
+ }
+
+ /* return if pipeline not found */
+ return -1;
+}
+
+static int
+thread_msg_req_handle(struct app_thread_data *t)
+{
+ void *msg_ptr;
+ struct thread_msg_req *req;
+ struct thread_msg_rsp *rsp;
+
+ msg_ptr = thread_msg_recv(t->msgq_in);
+ req = msg_ptr;
+ rsp = msg_ptr;
+
+ if (req != NULL)
+ switch (req->type) {
+ case THREAD_MSG_REQ_PIPELINE_ENABLE: {
+ rsp->status = thread_pipeline_enable(t,
+ (struct thread_pipeline_enable_msg_req *) req);
+ thread_msg_send(t->msgq_out, rsp);
+ break;
+ }
+
+ case THREAD_MSG_REQ_PIPELINE_DISABLE: {
+ rsp->status = thread_pipeline_disable(t,
+ (struct thread_pipeline_disable_msg_req *) req);
+ thread_msg_send(t->msgq_out, rsp);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+int
+app_thread(void *arg)
{
struct app_params *app = (struct app_params *) arg;
uint32_t core_id = rte_lcore_id(), i, j;
struct app_thread_data *t = &app->thread_data[core_id];
- uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
- uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
for (i = 0; ; i++) {
+ uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
+ uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
+
/* Run regular pipelines */
for (j = 0; j < n_regular; j++) {
struct app_thread_pipeline_data *data = &t->regular[j];
t_deadline = p_deadline;
}
+ /* Timer for thread message request */
+ {
+ uint64_t deadline = t->thread_req_deadline;
+
+ if (deadline <= time) {
+ thread_msg_req_handle(t);
+ deadline = time + t->timer_period;
+ t->thread_req_deadline = deadline;
+ }
+
+ if (deadline < t_deadline)
+ t_deadline = deadline;
+ }
+
t->deadline = t_deadline;
}
}