examples/pipeline: add new example application
authorCristian Dumitrescu <cristian.dumitrescu@intel.com>
Thu, 1 Oct 2020 10:20:04 +0000 (11:20 +0100)
committerDavid Marchand <david.marchand@redhat.com>
Thu, 1 Oct 2020 16:43:10 +0000 (18:43 +0200)
Add new example application to showcase the API of the newly
introduced SWX pipeline type.

Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
MAINTAINERS
examples/meson.build
examples/pipeline/Makefile [new file with mode: 0644]
examples/pipeline/main.c [new file with mode: 0644]
examples/pipeline/meson.build [new file with mode: 0644]
examples/pipeline/obj.c [new file with mode: 0644]
examples/pipeline/obj.h [new file with mode: 0644]
examples/pipeline/thread.c [new file with mode: 0644]
examples/pipeline/thread.h [new file with mode: 0644]

index 49a6dfa..df3033c 100644 (file)
@@ -1331,6 +1331,7 @@ F: app/test/test_table*
 F: app/test-pipeline/
 F: doc/guides/sample_app_ug/test_pipeline.rst
 F: examples/ip_pipeline/
+F: examples/pipeline/
 F: doc/guides/sample_app_ug/ip_pipeline.rst
 
 
index eb13e82..245d985 100644 (file)
@@ -33,6 +33,7 @@ all_examples = [
        'ntb', 'packet_ordering',
        'performance-thread/l3fwd-thread',
        'performance-thread/pthread_shim',
+       'pipeline',
        'ptpclient',
        'qos_meter', 'qos_sched',
        'rxtx_callbacks',
diff --git a/examples/pipeline/Makefile b/examples/pipeline/Makefile
new file mode 100644 (file)
index 0000000..da2f485
--- /dev/null
@@ -0,0 +1,50 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2020 Intel Corporation
+
+# binary name
+APP = pipeline
+
+# all source are stored in SRCS-y
+SRCS-y += main.c
+SRCS-y += obj.c
+SRCS-y += thread.c
+
+# Build using pkg-config variables if possible
+ifneq ($(shell pkg-config --exists libdpdk && echo 0),0)
+$(error "no installation of DPDK found")
+endif
+
+all: shared
+.PHONY: shared static
+shared: build/$(APP)-shared
+       ln -sf $(APP)-shared build/$(APP)
+static: build/$(APP)-static
+       ln -sf $(APP)-static build/$(APP)
+
+PKGCONF ?= pkg-config
+
+PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
+CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
+LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk)
+LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
+
+CFLAGS += -I. -DALLOW_EXPERIMENTAL_API -D_GNU_SOURCE
+
+OBJS := $(patsubst %.c,build/%.o,$(SRCS-y))
+
+build/%.o: %.c Makefile $(PC_FILE) | build
+       $(CC) $(CFLAGS) -c $< -o $@
+
+build/$(APP)-shared: $(OBJS)
+       $(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
+
+build/$(APP)-static: $(OBJS)
+       $(CC) $(OBJS) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC)
+
+build:
+       @mkdir -p $@
+
+.PHONY: clean
+clean:
+       rm -f build/$(APP)* build/*.o
+       test -d build && rmdir -p build || true
diff --git a/examples/pipeline/main.c b/examples/pipeline/main.c
new file mode 100644 (file)
index 0000000..d831df1
--- /dev/null
@@ -0,0 +1,50 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <rte_launch.h>
+#include <rte_eal.h>
+
+#include "obj.h"
+#include "thread.h"
+
+int
+main(int argc, char **argv)
+{
+       struct obj *obj;
+       int status;
+
+       /* EAL */
+       status = rte_eal_init(argc, argv);
+       if (status < 0) {
+               printf("Error: EAL initialization failed (%d)\n", status);
+               return status;
+       };
+
+       /* Obj */
+       obj = obj_init();
+       if (!obj) {
+               printf("Error: Obj initialization failed (%d)\n", status);
+               return status;
+       }
+
+       /* Thread */
+       status = thread_init();
+       if (status) {
+               printf("Error: Thread initialization failed (%d)\n", status);
+               return status;
+       }
+
+       rte_eal_mp_remote_launch(
+               thread_main,
+               NULL,
+               SKIP_MASTER);
+
+       return 0;
+}
diff --git a/examples/pipeline/meson.build b/examples/pipeline/meson.build
new file mode 100644 (file)
index 0000000..1ebef3a
--- /dev/null
@@ -0,0 +1,16 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2020 Intel Corporation
+
+# meson file, for building this example as part of a main DPDK build.
+#
+# To build this example as a standalone application with an already-installed
+# DPDK instance, use 'make'
+
+build = cc.has_header('sys/epoll.h')
+deps += ['pipeline', 'bus_pci']
+allow_experimental_apis = true
+sources = files(
+       'main.c',
+       'obj.c',
+       'thread.c',
+)
diff --git a/examples/pipeline/obj.c b/examples/pipeline/obj.c
new file mode 100644 (file)
index 0000000..84bbcf2
--- /dev/null
@@ -0,0 +1,470 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_ethdev.h>
+#include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_source_sink.h>
+#include <rte_swx_table_em.h>
+#include <rte_swx_pipeline.h>
+#include <rte_swx_ctl.h>
+
+#include "obj.h"
+
+/*
+ * mempool
+ */
+TAILQ_HEAD(mempool_list, mempool);
+
+/*
+ * link
+ */
+TAILQ_HEAD(link_list, link);
+
+/*
+ * pipeline
+ */
+TAILQ_HEAD(pipeline_list, pipeline);
+
+/*
+ * obj
+ */
+struct obj {
+       struct mempool_list mempool_list;
+       struct link_list link_list;
+       struct pipeline_list pipeline_list;
+};
+
+/*
+ * mempool
+ */
+#define BUFFER_SIZE_MIN (sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+struct mempool *
+mempool_create(struct obj *obj, const char *name, struct mempool_params *params)
+{
+       struct mempool *mempool;
+       struct rte_mempool *m;
+
+       /* Check input params */
+       if ((name == NULL) ||
+               mempool_find(obj, name) ||
+               (params == NULL) ||
+               (params->buffer_size < BUFFER_SIZE_MIN) ||
+               (params->pool_size == 0))
+               return NULL;
+
+       /* Resource create */
+       m = rte_pktmbuf_pool_create(
+               name,
+               params->pool_size,
+               params->cache_size,
+               0,
+               params->buffer_size - sizeof(struct rte_mbuf),
+               params->cpu_id);
+
+       if (m == NULL)
+               return NULL;
+
+       /* Node allocation */
+       mempool = calloc(1, sizeof(struct mempool));
+       if (mempool == NULL) {
+               rte_mempool_free(m);
+               return NULL;
+       }
+
+       /* Node fill in */
+       strlcpy(mempool->name, name, sizeof(mempool->name));
+       mempool->m = m;
+       mempool->buffer_size = params->buffer_size;
+
+       /* Node add to list */
+       TAILQ_INSERT_TAIL(&obj->mempool_list, mempool, node);
+
+       return mempool;
+}
+
+struct mempool *
+mempool_find(struct obj *obj, const char *name)
+{
+       struct mempool *mempool;
+
+       if (!obj || !name)
+               return NULL;
+
+       TAILQ_FOREACH(mempool, &obj->mempool_list, node)
+               if (strcmp(mempool->name, name) == 0)
+                       return mempool;
+
+       return NULL;
+}
+
+/*
+ * link
+ */
+static struct rte_eth_conf port_conf_default = {
+       .link_speeds = 0,
+       .rxmode = {
+               .mq_mode = ETH_MQ_RX_NONE,
+               .max_rx_pkt_len = 9000, /* Jumbo frame max packet len */
+               .split_hdr_size = 0, /* Header split buffer size */
+       },
+       .rx_adv_conf = {
+               .rss_conf = {
+                       .rss_key = NULL,
+                       .rss_key_len = 40,
+                       .rss_hf = 0,
+               },
+       },
+       .txmode = {
+               .mq_mode = ETH_MQ_TX_NONE,
+       },
+       .lpbk_mode = 0,
+};
+
+#define RETA_CONF_SIZE     (ETH_RSS_RETA_SIZE_512 / RTE_RETA_GROUP_SIZE)
+
+static int
+rss_setup(uint16_t port_id,
+       uint16_t reta_size,
+       struct link_params_rss *rss)
+{
+       struct rte_eth_rss_reta_entry64 reta_conf[RETA_CONF_SIZE];
+       uint32_t i;
+       int status;
+
+       /* RETA setting */
+       memset(reta_conf, 0, sizeof(reta_conf));
+
+       for (i = 0; i < reta_size; i++)
+               reta_conf[i / RTE_RETA_GROUP_SIZE].mask = UINT64_MAX;
+
+       for (i = 0; i < reta_size; i++) {
+               uint32_t reta_id = i / RTE_RETA_GROUP_SIZE;
+               uint32_t reta_pos = i % RTE_RETA_GROUP_SIZE;
+               uint32_t rss_qs_pos = i % rss->n_queues;
+
+               reta_conf[reta_id].reta[reta_pos] =
+                       (uint16_t) rss->queue_id[rss_qs_pos];
+       }
+
+       /* RETA update */
+       status = rte_eth_dev_rss_reta_update(port_id,
+               reta_conf,
+               reta_size);
+
+       return status;
+}
+
+struct link *
+link_create(struct obj *obj, const char *name, struct link_params *params)
+{
+       struct rte_eth_dev_info port_info;
+       struct rte_eth_conf port_conf;
+       struct link *link;
+       struct link_params_rss *rss;
+       struct mempool *mempool;
+       uint32_t cpu_id, i;
+       int status;
+       uint16_t port_id;
+
+       /* Check input params */
+       if ((name == NULL) ||
+               link_find(obj, name) ||
+               (params == NULL) ||
+               (params->rx.n_queues == 0) ||
+               (params->rx.queue_size == 0) ||
+               (params->tx.n_queues == 0) ||
+               (params->tx.queue_size == 0))
+               return NULL;
+
+       port_id = params->port_id;
+       if (params->dev_name) {
+               status = rte_eth_dev_get_port_by_name(params->dev_name,
+                       &port_id);
+
+               if (status)
+                       return NULL;
+       } else
+               if (!rte_eth_dev_is_valid_port(port_id))
+                       return NULL;
+
+       if (rte_eth_dev_info_get(port_id, &port_info) != 0)
+               return NULL;
+
+       mempool = mempool_find(obj, params->rx.mempool_name);
+       if (mempool == NULL)
+               return NULL;
+
+       rss = params->rx.rss;
+       if (rss) {
+               if ((port_info.reta_size == 0) ||
+                       (port_info.reta_size > ETH_RSS_RETA_SIZE_512))
+                       return NULL;
+
+               if ((rss->n_queues == 0) ||
+                       (rss->n_queues >= LINK_RXQ_RSS_MAX))
+                       return NULL;
+
+               for (i = 0; i < rss->n_queues; i++)
+                       if (rss->queue_id[i] >= port_info.max_rx_queues)
+                               return NULL;
+       }
+
+       /**
+        * Resource create
+        */
+       /* Port */
+       memcpy(&port_conf, &port_conf_default, sizeof(port_conf));
+       if (rss) {
+               port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
+               port_conf.rx_adv_conf.rss_conf.rss_hf =
+                       (ETH_RSS_IP | ETH_RSS_TCP | ETH_RSS_UDP) &
+                       port_info.flow_type_rss_offloads;
+       }
+
+       cpu_id = (uint32_t) rte_eth_dev_socket_id(port_id);
+       if (cpu_id == (uint32_t) SOCKET_ID_ANY)
+               cpu_id = 0;
+
+       status = rte_eth_dev_configure(
+               port_id,
+               params->rx.n_queues,
+               params->tx.n_queues,
+               &port_conf);
+
+       if (status < 0)
+               return NULL;
+
+       if (params->promiscuous) {
+               status = rte_eth_promiscuous_enable(port_id);
+               if (status != 0)
+                       return NULL;
+       }
+
+       /* Port RX */
+       for (i = 0; i < params->rx.n_queues; i++) {
+               status = rte_eth_rx_queue_setup(
+                       port_id,
+                       i,
+                       params->rx.queue_size,
+                       cpu_id,
+                       NULL,
+                       mempool->m);
+
+               if (status < 0)
+                       return NULL;
+       }
+
+       /* Port TX */
+       for (i = 0; i < params->tx.n_queues; i++) {
+               status = rte_eth_tx_queue_setup(
+                       port_id,
+                       i,
+                       params->tx.queue_size,
+                       cpu_id,
+                       NULL);
+
+               if (status < 0)
+                       return NULL;
+       }
+
+       /* Port start */
+       status = rte_eth_dev_start(port_id);
+       if (status < 0)
+               return NULL;
+
+       if (rss) {
+               status = rss_setup(port_id, port_info.reta_size, rss);
+
+               if (status) {
+                       rte_eth_dev_stop(port_id);
+                       return NULL;
+               }
+       }
+
+       /* Port link up */
+       status = rte_eth_dev_set_link_up(port_id);
+       if ((status < 0) && (status != -ENOTSUP)) {
+               rte_eth_dev_stop(port_id);
+               return NULL;
+       }
+
+       /* Node allocation */
+       link = calloc(1, sizeof(struct link));
+       if (link == NULL) {
+               rte_eth_dev_stop(port_id);
+               return NULL;
+       }
+
+       /* Node fill in */
+       strlcpy(link->name, name, sizeof(link->name));
+       link->port_id = port_id;
+       rte_eth_dev_get_name_by_port(port_id, link->dev_name);
+       link->n_rxq = params->rx.n_queues;
+       link->n_txq = params->tx.n_queues;
+
+       /* Node add to list */
+       TAILQ_INSERT_TAIL(&obj->link_list, link, node);
+
+       return link;
+}
+
+int
+link_is_up(struct obj *obj, const char *name)
+{
+       struct rte_eth_link link_params;
+       struct link *link;
+
+       /* Check input params */
+       if (!obj || !name)
+               return 0;
+
+       link = link_find(obj, name);
+       if (link == NULL)
+               return 0;
+
+       /* Resource */
+       if (rte_eth_link_get(link->port_id, &link_params) < 0)
+               return 0;
+
+       return (link_params.link_status == ETH_LINK_DOWN) ? 0 : 1;
+}
+
+struct link *
+link_find(struct obj *obj, const char *name)
+{
+       struct link *link;
+
+       if (!obj || !name)
+               return NULL;
+
+       TAILQ_FOREACH(link, &obj->link_list, node)
+               if (strcmp(link->name, name) == 0)
+                       return link;
+
+       return NULL;
+}
+
+struct link *
+link_next(struct obj *obj, struct link *link)
+{
+       return (link == NULL) ?
+               TAILQ_FIRST(&obj->link_list) : TAILQ_NEXT(link, node);
+}
+
+/*
+ * pipeline
+ */
+#ifndef PIPELINE_MSGQ_SIZE
+#define PIPELINE_MSGQ_SIZE                                 64
+#endif
+
+struct pipeline *
+pipeline_create(struct obj *obj, const char *name, int numa_node)
+{
+       struct pipeline *pipeline;
+       struct rte_swx_pipeline *p = NULL;
+       int status;
+
+       /* Check input params */
+       if ((name == NULL) ||
+               pipeline_find(obj, name))
+               return NULL;
+
+       /* Resource create */
+       status = rte_swx_pipeline_config(&p, numa_node);
+       if (status)
+               goto error;
+
+       status = rte_swx_pipeline_port_in_type_register(p,
+               "ethdev",
+               &rte_swx_port_ethdev_reader_ops);
+       if (status)
+               goto error;
+
+       status = rte_swx_pipeline_port_out_type_register(p,
+               "ethdev",
+               &rte_swx_port_ethdev_writer_ops);
+       if (status)
+               goto error;
+
+#ifdef RTE_PORT_PCAP
+       status = rte_swx_pipeline_port_in_type_register(p,
+               "source",
+               &rte_swx_port_source_ops);
+       if (status)
+               goto error;
+#endif
+
+       status = rte_swx_pipeline_port_out_type_register(p,
+               "sink",
+               &rte_swx_port_sink_ops);
+       if (status)
+               goto error;
+
+       status = rte_swx_pipeline_table_type_register(p,
+               "exact",
+               RTE_SWX_TABLE_MATCH_EXACT,
+               &rte_swx_table_exact_match_ops);
+       if (status)
+               goto error;
+
+       /* Node allocation */
+       pipeline = calloc(1, sizeof(struct pipeline));
+       if (pipeline == NULL)
+               goto error;
+
+       /* Node fill in */
+       strlcpy(pipeline->name, name, sizeof(pipeline->name));
+       pipeline->p = p;
+       pipeline->timer_period_ms = 10;
+
+       /* Node add to list */
+       TAILQ_INSERT_TAIL(&obj->pipeline_list, pipeline, node);
+
+       return pipeline;
+
+error:
+       rte_swx_pipeline_free(p);
+       return NULL;
+}
+
+struct pipeline *
+pipeline_find(struct obj *obj, const char *name)
+{
+       struct pipeline *pipeline;
+
+       if (!obj || !name)
+               return NULL;
+
+       TAILQ_FOREACH(pipeline, &obj->pipeline_list, node)
+               if (strcmp(name, pipeline->name) == 0)
+                       return pipeline;
+
+       return NULL;
+}
+
+/*
+ * obj
+ */
+struct obj *
+obj_init(void)
+{
+       struct obj *obj;
+
+       obj = calloc(1, sizeof(struct obj));
+       if (!obj)
+               return NULL;
+
+       TAILQ_INIT(&obj->mempool_list);
+       TAILQ_INIT(&obj->link_list);
+       TAILQ_INIT(&obj->pipeline_list);
+
+       return obj;
+}
diff --git a/examples/pipeline/obj.h b/examples/pipeline/obj.h
new file mode 100644 (file)
index 0000000..e6351fd
--- /dev/null
@@ -0,0 +1,131 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _INCLUDE_OBJ_H_
+#define _INCLUDE_OBJ_H_
+
+#include <stdint.h>
+#include <sys/queue.h>
+
+#include <rte_mempool.h>
+#include <rte_swx_pipeline.h>
+#include <rte_swx_ctl.h>
+
+#ifndef NAME_SIZE
+#define NAME_SIZE 64
+#endif
+
+/*
+ * obj
+ */
+struct obj;
+
+struct obj *
+obj_init(void);
+
+/*
+ * mempool
+ */
+struct mempool_params {
+       uint32_t buffer_size;
+       uint32_t pool_size;
+       uint32_t cache_size;
+       uint32_t cpu_id;
+};
+
+struct mempool {
+       TAILQ_ENTRY(mempool) node;
+       char name[NAME_SIZE];
+       struct rte_mempool *m;
+       uint32_t buffer_size;
+};
+
+struct mempool *
+mempool_create(struct obj *obj,
+              const char *name,
+              struct mempool_params *params);
+
+struct mempool *
+mempool_find(struct obj *obj,
+            const char *name);
+
+/*
+ * link
+ */
+#ifndef LINK_RXQ_RSS_MAX
+#define LINK_RXQ_RSS_MAX                                   16
+#endif
+
+struct link_params_rss {
+       uint32_t queue_id[LINK_RXQ_RSS_MAX];
+       uint32_t n_queues;
+};
+
+struct link_params {
+       const char *dev_name;
+       uint16_t port_id; /**< Valid only when *dev_name* is NULL. */
+
+       struct {
+               uint32_t n_queues;
+               uint32_t queue_size;
+               const char *mempool_name;
+               struct link_params_rss *rss;
+       } rx;
+
+       struct {
+               uint32_t n_queues;
+               uint32_t queue_size;
+       } tx;
+
+       int promiscuous;
+};
+
+struct link {
+       TAILQ_ENTRY(link) node;
+       char name[NAME_SIZE];
+       char dev_name[NAME_SIZE];
+       uint16_t port_id;
+       uint32_t n_rxq;
+       uint32_t n_txq;
+};
+
+struct link *
+link_create(struct obj *obj,
+           const char *name,
+           struct link_params *params);
+
+int
+link_is_up(struct obj *obj, const char *name);
+
+struct link *
+link_find(struct obj *obj, const char *name);
+
+struct link *
+link_next(struct obj *obj, struct link *link);
+
+/*
+ * pipeline
+ */
+struct pipeline {
+       TAILQ_ENTRY(pipeline) node;
+       char name[NAME_SIZE];
+
+       struct rte_swx_pipeline *p;
+       struct rte_swx_ctl_pipeline *ctl;
+
+       uint32_t timer_period_ms;
+       int enabled;
+       uint32_t thread_id;
+       uint32_t cpu_id;
+};
+
+struct pipeline *
+pipeline_create(struct obj *obj,
+               const char *name,
+               int numa_node);
+
+struct pipeline *
+pipeline_find(struct obj *obj, const char *name);
+
+#endif /* _INCLUDE_OBJ_H_ */
diff --git a/examples/pipeline/thread.c b/examples/pipeline/thread.c
new file mode 100644 (file)
index 0000000..7ff22e9
--- /dev/null
@@ -0,0 +1,549 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <stdlib.h>
+
+#include <rte_common.h>
+#include <rte_cycles.h>
+#include <rte_lcore.h>
+#include <rte_ring.h>
+
+#include <rte_table_acl.h>
+#include <rte_table_array.h>
+#include <rte_table_hash.h>
+#include <rte_table_lpm.h>
+#include <rte_table_lpm_ipv6.h>
+
+#include "obj.h"
+#include "thread.h"
+
+#ifndef THREAD_PIPELINES_MAX
+#define THREAD_PIPELINES_MAX                               256
+#endif
+
+#ifndef THREAD_MSGQ_SIZE
+#define THREAD_MSGQ_SIZE                                   64
+#endif
+
+#ifndef THREAD_TIMER_PERIOD_MS
+#define THREAD_TIMER_PERIOD_MS                             100
+#endif
+
+/**
+ * Control thread: data plane thread context
+ */
+struct thread {
+       struct rte_ring *msgq_req;
+       struct rte_ring *msgq_rsp;
+
+       uint32_t enabled;
+};
+
+static struct thread thread[RTE_MAX_LCORE];
+
+/**
+ * Data plane threads: context
+ */
+struct pipeline_data {
+       struct rte_swx_pipeline *p;
+       uint64_t timer_period; /* Measured in CPU cycles. */
+       uint64_t time_next;
+};
+
+struct thread_data {
+       struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
+       uint32_t n_pipelines;
+
+       struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
+       struct rte_ring *msgq_req;
+       struct rte_ring *msgq_rsp;
+       uint64_t timer_period; /* Measured in CPU cycles. */
+       uint64_t time_next;
+       uint64_t time_next_min;
+} __rte_cache_aligned;
+
+static struct thread_data thread_data[RTE_MAX_LCORE];
+
+/**
+ * Control thread: data plane thread init
+ */
+static void
+thread_free(void)
+{
+       uint32_t i;
+
+       for (i = 0; i < RTE_MAX_LCORE; i++) {
+               struct thread *t = &thread[i];
+
+               if (!rte_lcore_is_enabled(i))
+                       continue;
+
+               /* MSGQs */
+               if (t->msgq_req)
+                       rte_ring_free(t->msgq_req);
+
+               if (t->msgq_rsp)
+                       rte_ring_free(t->msgq_rsp);
+       }
+}
+
+int
+thread_init(void)
+{
+       uint32_t i;
+
+       RTE_LCORE_FOREACH_SLAVE(i) {
+               char name[NAME_MAX];
+               struct rte_ring *msgq_req, *msgq_rsp;
+               struct thread *t = &thread[i];
+               struct thread_data *t_data = &thread_data[i];
+               uint32_t cpu_id = rte_lcore_to_socket_id(i);
+
+               /* MSGQs */
+               snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
+
+               msgq_req = rte_ring_create(name,
+                       THREAD_MSGQ_SIZE,
+                       cpu_id,
+                       RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+               if (msgq_req == NULL) {
+                       thread_free();
+                       return -1;
+               }
+
+               snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
+
+               msgq_rsp = rte_ring_create(name,
+                       THREAD_MSGQ_SIZE,
+                       cpu_id,
+                       RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+               if (msgq_rsp == NULL) {
+                       thread_free();
+                       return -1;
+               }
+
+               /* Control thread records */
+               t->msgq_req = msgq_req;
+               t->msgq_rsp = msgq_rsp;
+               t->enabled = 1;
+
+               /* Data plane thread records */
+               t_data->n_pipelines = 0;
+               t_data->msgq_req = msgq_req;
+               t_data->msgq_rsp = msgq_rsp;
+               t_data->timer_period =
+                       (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
+               t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
+               t_data->time_next_min = t_data->time_next;
+       }
+
+       return 0;
+}
+
+static inline int
+thread_is_running(uint32_t thread_id)
+{
+       enum rte_lcore_state_t thread_state;
+
+       thread_state = rte_eal_get_lcore_state(thread_id);
+       return (thread_state == RUNNING) ? 1 : 0;
+}
+
+/**
+ * Control thread & data plane threads: message passing
+ */
+enum thread_req_type {
+       THREAD_REQ_PIPELINE_ENABLE = 0,
+       THREAD_REQ_PIPELINE_DISABLE,
+       THREAD_REQ_MAX
+};
+
+struct thread_msg_req {
+       enum thread_req_type type;
+
+       union {
+               struct {
+                       struct rte_swx_pipeline *p;
+                       uint32_t timer_period_ms;
+               } pipeline_enable;
+
+               struct {
+                       struct rte_swx_pipeline *p;
+               } pipeline_disable;
+       };
+};
+
+struct thread_msg_rsp {
+       int status;
+};
+
+/**
+ * Control thread
+ */
+static struct thread_msg_req *
+thread_msg_alloc(void)
+{
+       size_t size = RTE_MAX(sizeof(struct thread_msg_req),
+               sizeof(struct thread_msg_rsp));
+
+       return calloc(1, size);
+}
+
+static void
+thread_msg_free(struct thread_msg_rsp *rsp)
+{
+       free(rsp);
+}
+
+static struct thread_msg_rsp *
+thread_msg_send_recv(uint32_t thread_id,
+       struct thread_msg_req *req)
+{
+       struct thread *t = &thread[thread_id];
+       struct rte_ring *msgq_req = t->msgq_req;
+       struct rte_ring *msgq_rsp = t->msgq_rsp;
+       struct thread_msg_rsp *rsp;
+       int status;
+
+       /* send */
+       do {
+               status = rte_ring_sp_enqueue(msgq_req, req);
+       } while (status == -ENOBUFS);
+
+       /* recv */
+       do {
+               status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
+       } while (status != 0);
+
+       return rsp;
+}
+
+int
+thread_pipeline_enable(uint32_t thread_id,
+       struct obj *obj,
+       const char *pipeline_name)
+{
+       struct pipeline *p = pipeline_find(obj, pipeline_name);
+       struct thread *t;
+       struct thread_msg_req *req;
+       struct thread_msg_rsp *rsp;
+       int status;
+
+       /* Check input params */
+       if ((thread_id >= RTE_MAX_LCORE) ||
+               (p == NULL))
+               return -1;
+
+       t = &thread[thread_id];
+       if (t->enabled == 0)
+               return -1;
+
+       if (!thread_is_running(thread_id)) {
+               struct thread_data *td = &thread_data[thread_id];
+               struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
+
+               if (td->n_pipelines >= THREAD_PIPELINES_MAX)
+                       return -1;
+
+               /* Data plane thread */
+               td->p[td->n_pipelines] = p->p;
+
+               tdp->p = p->p;
+               tdp->timer_period =
+                       (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
+               tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
+
+               td->n_pipelines++;
+
+               /* Pipeline */
+               p->thread_id = thread_id;
+               p->enabled = 1;
+
+               return 0;
+       }
+
+       /* Allocate request */
+       req = thread_msg_alloc();
+       if (req == NULL)
+               return -1;
+
+       /* Write request */
+       req->type = THREAD_REQ_PIPELINE_ENABLE;
+       req->pipeline_enable.p = p->p;
+       req->pipeline_enable.timer_period_ms = p->timer_period_ms;
+
+       /* Send request and wait for response */
+       rsp = thread_msg_send_recv(thread_id, req);
+
+       /* Read response */
+       status = rsp->status;
+
+       /* Free response */
+       thread_msg_free(rsp);
+
+       /* Request completion */
+       if (status)
+               return status;
+
+       p->thread_id = thread_id;
+       p->enabled = 1;
+
+       return 0;
+}
+
+int
+thread_pipeline_disable(uint32_t thread_id,
+       struct obj *obj,
+       const char *pipeline_name)
+{
+       struct pipeline *p = pipeline_find(obj, pipeline_name);
+       struct thread *t;
+       struct thread_msg_req *req;
+       struct thread_msg_rsp *rsp;
+       int status;
+
+       /* Check input params */
+       if ((thread_id >= RTE_MAX_LCORE) ||
+               (p == NULL))
+               return -1;
+
+       t = &thread[thread_id];
+       if (t->enabled == 0)
+               return -1;
+
+       if (p->enabled == 0)
+               return 0;
+
+       if (p->thread_id != thread_id)
+               return -1;
+
+       if (!thread_is_running(thread_id)) {
+               struct thread_data *td = &thread_data[thread_id];
+               uint32_t i;
+
+               for (i = 0; i < td->n_pipelines; i++) {
+                       struct pipeline_data *tdp = &td->pipeline_data[i];
+
+                       if (tdp->p != p->p)
+                               continue;
+
+                       /* Data plane thread */
+                       if (i < td->n_pipelines - 1) {
+                               struct rte_swx_pipeline *pipeline_last =
+                                       td->p[td->n_pipelines - 1];
+                               struct pipeline_data *tdp_last =
+                                       &td->pipeline_data[td->n_pipelines - 1];
+
+                               td->p[i] = pipeline_last;
+                               memcpy(tdp, tdp_last, sizeof(*tdp));
+                       }
+
+                       td->n_pipelines--;
+
+                       /* Pipeline */
+                       p->enabled = 0;
+
+                       break;
+               }
+
+               return 0;
+       }
+
+       /* Allocate request */
+       req = thread_msg_alloc();
+       if (req == NULL)
+               return -1;
+
+       /* Write request */
+       req->type = THREAD_REQ_PIPELINE_DISABLE;
+       req->pipeline_disable.p = p->p;
+
+       /* Send request and wait for response */
+       rsp = thread_msg_send_recv(thread_id, req);
+
+       /* Read response */
+       status = rsp->status;
+
+       /* Free response */
+       thread_msg_free(rsp);
+
+       /* Request completion */
+       if (status)
+               return status;
+
+       p->enabled = 0;
+
+       return 0;
+}
+
+/**
+ * Data plane threads: message handling
+ */
+static inline struct thread_msg_req *
+thread_msg_recv(struct rte_ring *msgq_req)
+{
+       struct thread_msg_req *req;
+
+       int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
+
+       if (status != 0)
+               return NULL;
+
+       return req;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *msgq_rsp,
+       struct thread_msg_rsp *rsp)
+{
+       int status;
+
+       do {
+               status = rte_ring_sp_enqueue(msgq_rsp, rsp);
+       } while (status == -ENOBUFS);
+}
+
+static struct thread_msg_rsp *
+thread_msg_handle_pipeline_enable(struct thread_data *t,
+       struct thread_msg_req *req)
+{
+       struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
+       struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
+
+       /* Request */
+       if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
+               rsp->status = -1;
+               return rsp;
+       }
+
+       t->p[t->n_pipelines] = req->pipeline_enable.p;
+
+       p->p = req->pipeline_enable.p;
+       p->timer_period = (rte_get_tsc_hz() *
+               req->pipeline_enable.timer_period_ms) / 1000;
+       p->time_next = rte_get_tsc_cycles() + p->timer_period;
+
+       t->n_pipelines++;
+
+       /* Response */
+       rsp->status = 0;
+       return rsp;
+}
+
+static struct thread_msg_rsp *
+thread_msg_handle_pipeline_disable(struct thread_data *t,
+       struct thread_msg_req *req)
+{
+       struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
+       uint32_t n_pipelines = t->n_pipelines;
+       struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
+       uint32_t i;
+
+       /* find pipeline */
+       for (i = 0; i < n_pipelines; i++) {
+               struct pipeline_data *p = &t->pipeline_data[i];
+
+               if (p->p != pipeline)
+                       continue;
+
+               if (i < n_pipelines - 1) {
+                       struct rte_swx_pipeline *pipeline_last =
+                               t->p[n_pipelines - 1];
+                       struct pipeline_data *p_last =
+                               &t->pipeline_data[n_pipelines - 1];
+
+                       t->p[i] = pipeline_last;
+                       memcpy(p, p_last, sizeof(*p));
+               }
+
+               t->n_pipelines--;
+
+               rsp->status = 0;
+               return rsp;
+       }
+
+       /* should not get here */
+       rsp->status = 0;
+       return rsp;
+}
+
+static void
+thread_msg_handle(struct thread_data *t)
+{
+       for ( ; ; ) {
+               struct thread_msg_req *req;
+               struct thread_msg_rsp *rsp;
+
+               req = thread_msg_recv(t->msgq_req);
+               if (req == NULL)
+                       break;
+
+               switch (req->type) {
+               case THREAD_REQ_PIPELINE_ENABLE:
+                       rsp = thread_msg_handle_pipeline_enable(t, req);
+                       break;
+
+               case THREAD_REQ_PIPELINE_DISABLE:
+                       rsp = thread_msg_handle_pipeline_disable(t, req);
+                       break;
+
+               default:
+                       rsp = (struct thread_msg_rsp *) req;
+                       rsp->status = -1;
+               }
+
+               thread_msg_send(t->msgq_rsp, rsp);
+       }
+}
+
+/**
+ * Data plane threads: main
+ */
+int
+thread_main(void *arg __rte_unused)
+{
+       struct thread_data *t;
+       uint32_t thread_id, i;
+
+       thread_id = rte_lcore_id();
+       t = &thread_data[thread_id];
+
+       /* Dispatch loop */
+       for (i = 0; ; i++) {
+               uint32_t j;
+
+               /* Data Plane */
+               for (j = 0; j < t->n_pipelines; j++)
+                       rte_swx_pipeline_run(t->p[j], 1000000);
+
+               /* Control Plane */
+               if ((i & 0xF) == 0) {
+                       uint64_t time = rte_get_tsc_cycles();
+                       uint64_t time_next_min = UINT64_MAX;
+
+                       if (time < t->time_next_min)
+                               continue;
+
+                       /* Thread message queues */
+                       {
+                               uint64_t time_next = t->time_next;
+
+                               if (time_next <= time) {
+                                       thread_msg_handle(t);
+                                       time_next = time + t->timer_period;
+                                       t->time_next = time_next;
+                               }
+
+                               if (time_next < time_next_min)
+                                       time_next_min = time_next;
+                       }
+
+                       t->time_next_min = time_next_min;
+               }
+       }
+
+       return 0;
+}
diff --git a/examples/pipeline/thread.h b/examples/pipeline/thread.h
new file mode 100644 (file)
index 0000000..d9d8645
--- /dev/null
@@ -0,0 +1,28 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _INCLUDE_THREAD_H_
+#define _INCLUDE_THREAD_H_
+
+#include <stdint.h>
+
+#include "obj.h"
+
+int
+thread_pipeline_enable(uint32_t thread_id,
+       struct obj *obj,
+       const char *pipeline_name);
+
+int
+thread_pipeline_disable(uint32_t thread_id,
+       struct obj *obj,
+       const char *pipeline_name);
+
+int
+thread_init(void);
+
+int
+thread_main(void *arg);
+
+#endif /* _INCLUDE_THREAD_H_ */