examples/eventdev: rename example
authorPavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Wed, 10 Jan 2018 11:10:12 +0000 (16:40 +0530)
committerJerin Jacob <jerin.jacob@caviumnetworks.com>
Fri, 19 Jan 2018 15:09:56 +0000 (16:09 +0100)
Rename eventdev_pipeline_sw_pmd to eventdev_pipeline as it is no longer
specific underlying event device.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
15 files changed:
MAINTAINERS
doc/guides/sample_app_ug/eventdev_pipeline.rst [new file with mode: 0644]
doc/guides/sample_app_ug/eventdev_pipeline_sw_pmd.rst [deleted file]
doc/guides/sample_app_ug/index.rst
examples/Makefile
examples/eventdev_pipeline/Makefile [new file with mode: 0644]
examples/eventdev_pipeline/main.c [new file with mode: 0644]
examples/eventdev_pipeline/pipeline_common.h [new file with mode: 0644]
examples/eventdev_pipeline/pipeline_worker_generic.c [new file with mode: 0644]
examples/eventdev_pipeline/pipeline_worker_tx.c [new file with mode: 0644]
examples/eventdev_pipeline_sw_pmd/Makefile [deleted file]
examples/eventdev_pipeline_sw_pmd/main.c [deleted file]
examples/eventdev_pipeline_sw_pmd/pipeline_common.h [deleted file]
examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c [deleted file]
examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c [deleted file]

index d39d2d9..b9908e7 100644 (file)
@@ -718,8 +718,8 @@ Software Eventdev PMD
 M: Harry van Haaren <harry.van.haaren@intel.com>
 F: drivers/event/sw/
 F: doc/guides/eventdevs/sw.rst
-F: examples/eventdev_pipeline_sw_pmd/
-F: doc/guides/sample_app_ug/eventdev_pipeline_sw_pmd.rst
+F: examples/eventdev_pipeline/
+F: doc/guides/sample_app_ug/eventdev_pipeline.rst
 
 Software OPDL Eventdev PMD
 M: Liang Ma <liang.j.ma@intel.com>
diff --git a/doc/guides/sample_app_ug/eventdev_pipeline.rst b/doc/guides/sample_app_ug/eventdev_pipeline.rst
new file mode 100644 (file)
index 0000000..ff6d2f0
--- /dev/null
@@ -0,0 +1,173 @@
+
+..  BSD LICENSE
+    Copyright(c) 2017 Intel Corporation. All rights reserved.
+    All rights reserved.
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions
+    are met:
+
+    * Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution.
+    * Neither the name of Intel Corporation nor the names of its
+    contributors may be used to endorse or promote products derived
+    from this software without specific prior written permission.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+    A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Eventdev Pipeline Sample Application
+====================================
+
+The eventdev pipeline sample application is a sample app that demonstrates
+the usage of the eventdev API using the software PMD. It shows how an
+application can configure a pipeline and assign a set of worker cores to
+perform the processing required.
+
+The application has a range of command line arguments allowing it to be
+configured for various numbers worker cores, stages,queue depths and cycles per
+stage of work. This is useful for performance testing as well as quickly testing
+a particular pipeline configuration.
+
+
+Compiling the Application
+-------------------------
+
+To compile the sample application see :doc:`compiling`.
+
+The application is located in the ``examples`` sub-directory.
+
+
+
+Running the Application
+-----------------------
+
+The application has a lot of command line options. This allows specification of
+the eventdev PMD to use, and a number of attributes of the processing pipeline
+options.
+
+An example eventdev pipeline running with the software eventdev PMD using
+these settings is shown below:
+
+ * ``-r1``: core mask 0x1 for RX
+ * ``-t1``: core mask 0x1 for TX
+ * ``-e4``: core mask 0x4 for the software scheduler
+ * ``-w FF00``: core mask for worker cores, 8 cores from 8th to 16th
+ * ``-s4``: 4 atomic stages
+ * ``-n0``: process infinite packets (run forever)
+ * ``-c32``: worker dequeue depth of 32
+ * ``-W1000``: do 1000 cycles of work per packet in each stage
+ * ``-D``: dump statistics on exit
+
+.. code-block:: console
+
+    ./build/eventdev_pipeline --vdev event_sw0 -- -r1 -t1 -e4 -w FF00 -s4 -n0 -c32 -W1000 -D
+
+The application has some sanity checking built-in, so if there is a function
+(eg; the RX core) which doesn't have a cpu core mask assigned, the application
+will print an error message:
+
+.. code-block:: console
+
+  Core part of pipeline was not assigned any cores. This will stall the
+  pipeline, please check core masks (use -h for details on setting core masks):
+          rx: 0
+          tx: 1
+
+Configuration of the eventdev is covered in detail in the programmers guide,
+see the Event Device Library section.
+
+
+Observing the Application
+-------------------------
+
+At runtime the eventdev pipeline application prints out a summary of the
+configuration, and some runtime statistics like packets per second. On exit the
+worker statistics are printed, along with a full dump of the PMD statistics if
+required. The following sections show sample output for each of the output
+types.
+
+Configuration
+~~~~~~~~~~~~~
+
+This provides an overview of the pipeline,
+scheduling type at each stage, and parameters to options such as how many
+flows to use and what eventdev PMD is in use. See the following sample output
+for details:
+
+.. code-block:: console
+
+  Config:
+        ports: 2
+        workers: 8
+        packets: 0
+        priorities: 1
+        Queue-prio: 0
+        qid0 type: atomic
+        Cores available: 44
+        Cores used: 10
+        Eventdev 0: event_sw
+  Stages:
+        Stage 0, Type Atomic    Priority = 128
+        Stage 1, Type Atomic    Priority = 128
+        Stage 2, Type Atomic    Priority = 128
+        Stage 3, Type Atomic    Priority = 128
+
+Runtime
+~~~~~~~
+
+At runtime, the statistics of the consumer are printed, stating the number of
+packets received, runtime in milliseconds, average mpps, and current mpps.
+
+.. code-block:: console
+
+  # consumer RX= xxxxxxx, time yyyy ms, avg z.zzz mpps [current w.www mpps]
+
+Shutdown
+~~~~~~~~
+
+At shutdown, the application prints the number of packets received and
+transmitted, and an overview of the distribution of work across worker cores.
+
+.. code-block:: console
+
+        Signal 2 received, preparing to exit...
+          worker 12 thread done. RX=4966581 TX=4966581
+          worker 13 thread done. RX=4963329 TX=4963329
+          worker 14 thread done. RX=4953614 TX=4953614
+          worker 0 thread done. RX=0 TX=0
+          worker 11 thread done. RX=4970549 TX=4970549
+          worker 10 thread done. RX=4986391 TX=4986391
+          worker 9 thread done. RX=4970528 TX=4970528
+          worker 15 thread done. RX=4974087 TX=4974087
+          worker 8 thread done. RX=4979908 TX=4979908
+          worker 2 thread done. RX=0 TX=0
+
+        Port Workload distribution:
+        worker 0 :      12.5 % (4979876 pkts)
+        worker 1 :      12.5 % (4970497 pkts)
+        worker 2 :      12.5 % (4986359 pkts)
+        worker 3 :      12.5 % (4970517 pkts)
+        worker 4 :      12.5 % (4966566 pkts)
+        worker 5 :      12.5 % (4963297 pkts)
+        worker 6 :      12.5 % (4953598 pkts)
+        worker 7 :      12.5 % (4974055 pkts)
+
+To get a full dump of the state of the eventdev PMD, pass the ``-D`` flag to
+this application. When the app is terminated using ``Ctrl+C``, the
+``rte_event_dev_dump()`` function is called, resulting in a dump of the
+statistics that the PMD provides. The statistics provided depend on the PMD
+used, see the Event Device Drivers section for a list of eventdev PMDs.
diff --git a/doc/guides/sample_app_ug/eventdev_pipeline_sw_pmd.rst b/doc/guides/sample_app_ug/eventdev_pipeline_sw_pmd.rst
deleted file mode 100644 (file)
index 01a5f9b..0000000
+++ /dev/null
@@ -1,173 +0,0 @@
-
-..  BSD LICENSE
-    Copyright(c) 2017 Intel Corporation. All rights reserved.
-    All rights reserved.
-
-    Redistribution and use in source and binary forms, with or without
-    modification, are permitted provided that the following conditions
-    are met:
-
-    * Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-    * Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution.
-    * Neither the name of Intel Corporation nor the names of its
-    contributors may be used to endorse or promote products derived
-    from this software without specific prior written permission.
-
-    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-    A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-Eventdev Pipeline SW PMD Sample Application
-===========================================
-
-The eventdev pipeline sample application is a sample app that demonstrates
-the usage of the eventdev API using the software PMD. It shows how an
-application can configure a pipeline and assign a set of worker cores to
-perform the processing required.
-
-The application has a range of command line arguments allowing it to be
-configured for various numbers worker cores, stages,queue depths and cycles per
-stage of work. This is useful for performance testing as well as quickly testing
-a particular pipeline configuration.
-
-
-Compiling the Application
--------------------------
-
-To compile the sample application see :doc:`compiling`.
-
-The application is located in the ``examples`` sub-directory.
-
-
-
-Running the Application
------------------------
-
-The application has a lot of command line options. This allows specification of
-the eventdev PMD to use, and a number of attributes of the processing pipeline
-options.
-
-An example eventdev pipeline running with the software eventdev PMD using
-these settings is shown below:
-
- * ``-r1``: core mask 0x1 for RX
- * ``-t1``: core mask 0x1 for TX
- * ``-e4``: core mask 0x4 for the software scheduler
- * ``-w FF00``: core mask for worker cores, 8 cores from 8th to 16th
- * ``-s4``: 4 atomic stages
- * ``-n0``: process infinite packets (run forever)
- * ``-c32``: worker dequeue depth of 32
- * ``-W1000``: do 1000 cycles of work per packet in each stage
- * ``-D``: dump statistics on exit
-
-.. code-block:: console
-
-    ./build/eventdev_pipeline_sw_pmd --vdev event_sw0 -- -r1 -t1 -e4 -w FF00 -s4 -n0 -c32 -W1000 -D
-
-The application has some sanity checking built-in, so if there is a function
-(eg; the RX core) which doesn't have a cpu core mask assigned, the application
-will print an error message:
-
-.. code-block:: console
-
-  Core part of pipeline was not assigned any cores. This will stall the
-  pipeline, please check core masks (use -h for details on setting core masks):
-          rx: 0
-          tx: 1
-
-Configuration of the eventdev is covered in detail in the programmers guide,
-see the Event Device Library section.
-
-
-Observing the Application
--------------------------
-
-At runtime the eventdev pipeline application prints out a summary of the
-configuration, and some runtime statistics like packets per second. On exit the
-worker statistics are printed, along with a full dump of the PMD statistics if
-required. The following sections show sample output for each of the output
-types.
-
-Configuration
-~~~~~~~~~~~~~
-
-This provides an overview of the pipeline,
-scheduling type at each stage, and parameters to options such as how many
-flows to use and what eventdev PMD is in use. See the following sample output
-for details:
-
-.. code-block:: console
-
-  Config:
-        ports: 2
-        workers: 8
-        packets: 0
-        priorities: 1
-        Queue-prio: 0
-        qid0 type: atomic
-        Cores available: 44
-        Cores used: 10
-        Eventdev 0: event_sw
-  Stages:
-        Stage 0, Type Atomic    Priority = 128
-        Stage 1, Type Atomic    Priority = 128
-        Stage 2, Type Atomic    Priority = 128
-        Stage 3, Type Atomic    Priority = 128
-
-Runtime
-~~~~~~~
-
-At runtime, the statistics of the consumer are printed, stating the number of
-packets received, runtime in milliseconds, average mpps, and current mpps.
-
-.. code-block:: console
-
-  # consumer RX= xxxxxxx, time yyyy ms, avg z.zzz mpps [current w.www mpps]
-
-Shutdown
-~~~~~~~~
-
-At shutdown, the application prints the number of packets received and
-transmitted, and an overview of the distribution of work across worker cores.
-
-.. code-block:: console
-
-        Signal 2 received, preparing to exit...
-          worker 12 thread done. RX=4966581 TX=4966581
-          worker 13 thread done. RX=4963329 TX=4963329
-          worker 14 thread done. RX=4953614 TX=4953614
-          worker 0 thread done. RX=0 TX=0
-          worker 11 thread done. RX=4970549 TX=4970549
-          worker 10 thread done. RX=4986391 TX=4986391
-          worker 9 thread done. RX=4970528 TX=4970528
-          worker 15 thread done. RX=4974087 TX=4974087
-          worker 8 thread done. RX=4979908 TX=4979908
-          worker 2 thread done. RX=0 TX=0
-
-        Port Workload distribution:
-        worker 0 :      12.5 % (4979876 pkts)
-        worker 1 :      12.5 % (4970497 pkts)
-        worker 2 :      12.5 % (4986359 pkts)
-        worker 3 :      12.5 % (4970517 pkts)
-        worker 4 :      12.5 % (4966566 pkts)
-        worker 5 :      12.5 % (4963297 pkts)
-        worker 6 :      12.5 % (4953598 pkts)
-        worker 7 :      12.5 % (4974055 pkts)
-
-To get a full dump of the state of the eventdev PMD, pass the ``-D`` flag to
-this application. When the app is terminated using ``Ctrl+C``, the
-``rte_event_dev_dump()`` function is called, resulting in a dump of the
-statistics that the PMD provides. The statistics provided depend on the PMD
-used, see the Event Device Drivers section for a list of eventdev PMDs.
index 3d04cf7..85dc6f5 100644 (file)
@@ -74,7 +74,7 @@ Sample Applications User Guides
     netmap_compatibility
     ip_pipeline
     test_pipeline
-    eventdev_pipeline_sw_pmd
+    eventdev_pipeline
     dist_app
     vm_power_management
     tep_termination
index 5840b21..17ecf7f 100644 (file)
@@ -103,6 +103,6 @@ $(info vm_power_manager requires libvirt >= 0.9.3)
 endif
 endif
 
-DIRS-y += eventdev_pipeline_sw_pmd
+DIRS-y += eventdev_pipeline
 
 include $(RTE_SDK)/mk/rte.extsubdir.mk
diff --git a/examples/eventdev_pipeline/Makefile b/examples/eventdev_pipeline/Makefile
new file mode 100644 (file)
index 0000000..94b4c86
--- /dev/null
@@ -0,0 +1,24 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2016-2017 Intel Corporation
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = eventdev_pipeline
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+SRCS-y += pipeline_worker_generic.c
+SRCS-y += pipeline_worker_tx.c
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline/main.c b/examples/eventdev_pipeline/main.c
new file mode 100644 (file)
index 0000000..2422c18
--- /dev/null
@@ -0,0 +1,574 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2016-2017 Intel Corporation
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+#include <sched.h>
+
+#include "pipeline_common.h"
+
+struct config_data cdata = {
+       .num_packets = (1L << 25), /* do ~32M packets */
+       .num_fids = 512,
+       .queue_type = RTE_SCHED_TYPE_ATOMIC,
+       .next_qid = {-1},
+       .qid = {-1},
+       .num_stages = 1,
+       .worker_cq_depth = 16
+};
+
+static bool
+core_in_use(unsigned int lcore_id) {
+       return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
+               fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
+}
+
+static void
+eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
+                       void *userdata)
+{
+       int port_id = (uintptr_t) userdata;
+       unsigned int _sent = 0;
+
+       do {
+               /* Note: hard-coded TX queue */
+               _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
+                                         unsent - _sent);
+       } while (_sent != unsent);
+}
+
+/*
+ * Parse the coremask given as argument (hexadecimal string) and fill
+ * the global configuration (core role and core count) with the parsed
+ * value.
+ */
+static int xdigit2val(unsigned char c)
+{
+       int val;
+
+       if (isdigit(c))
+               val = c - '0';
+       else if (isupper(c))
+               val = c - 'A' + 10;
+       else
+               val = c - 'a' + 10;
+       return val;
+}
+
+static uint64_t
+parse_coremask(const char *coremask)
+{
+       int i, j, idx = 0;
+       unsigned int count = 0;
+       char c;
+       int val;
+       uint64_t mask = 0;
+       const int32_t BITS_HEX = 4;
+
+       if (coremask == NULL)
+               return -1;
+       /* Remove all blank characters ahead and after .
+        * Remove 0x/0X if exists.
+        */
+       while (isblank(*coremask))
+               coremask++;
+       if (coremask[0] == '0' && ((coremask[1] == 'x')
+               || (coremask[1] == 'X')))
+               coremask += 2;
+       i = strlen(coremask);
+       while ((i > 0) && isblank(coremask[i - 1]))
+               i--;
+       if (i == 0)
+               return -1;
+
+       for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
+               c = coremask[i];
+               if (isxdigit(c) == 0) {
+                       /* invalid characters */
+                       return -1;
+               }
+               val = xdigit2val(c);
+               for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
+                       if ((1 << j) & val) {
+                               mask |= (1UL << idx);
+                               count++;
+                       }
+               }
+       }
+       for (; i >= 0; i--)
+               if (coremask[i] != '0')
+                       return -1;
+       if (count == 0)
+               return -1;
+       return mask;
+}
+
+static struct option long_options[] = {
+       {"workers", required_argument, 0, 'w'},
+       {"packets", required_argument, 0, 'n'},
+       {"atomic-flows", required_argument, 0, 'f'},
+       {"num_stages", required_argument, 0, 's'},
+       {"rx-mask", required_argument, 0, 'r'},
+       {"tx-mask", required_argument, 0, 't'},
+       {"sched-mask", required_argument, 0, 'e'},
+       {"cq-depth", required_argument, 0, 'c'},
+       {"work-cycles", required_argument, 0, 'W'},
+       {"mempool-size", required_argument, 0, 'm'},
+       {"queue-priority", no_argument, 0, 'P'},
+       {"parallel", no_argument, 0, 'p'},
+       {"ordered", no_argument, 0, 'o'},
+       {"quiet", no_argument, 0, 'q'},
+       {"use-atq", no_argument, 0, 'a'},
+       {"dump", no_argument, 0, 'D'},
+       {0, 0, 0, 0}
+};
+
+static void
+usage(void)
+{
+       const char *usage_str =
+               "  Usage: eventdev_demo [options]\n"
+               "  Options:\n"
+               "  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
+               "  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
+               "  -s, --num_stages=N           Use N atomic stages (default 1)\n"
+               "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
+               "  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
+               "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
+               "  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
+               "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
+               "  -W  --work-cycles=N          Worker cycles (default 0)\n"
+               "  -P  --queue-priority         Enable scheduler queue prioritization\n"
+               "  -o, --ordered                Use ordered scheduling\n"
+               "  -p, --parallel               Use parallel scheduling\n"
+               "  -q, --quiet                  Minimize printed output\n"
+               "  -a, --use-atq                Use all type queues\n"
+               "  -m, --mempool-size=N         Dictate the mempool size\n"
+               "  -D, --dump                   Print detailed statistics before exit"
+               "\n";
+       fprintf(stderr, "%s", usage_str);
+       exit(1);
+}
+
+static void
+parse_app_args(int argc, char **argv)
+{
+       /* Parse cli options*/
+       int option_index;
+       int c;
+       opterr = 0;
+       uint64_t rx_lcore_mask = 0;
+       uint64_t tx_lcore_mask = 0;
+       uint64_t sched_lcore_mask = 0;
+       uint64_t worker_lcore_mask = 0;
+       int i;
+
+       for (;;) {
+               c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
+                               long_options, &option_index);
+               if (c == -1)
+                       break;
+
+               int popcnt = 0;
+               switch (c) {
+               case 'n':
+                       cdata.num_packets = (int64_t)atol(optarg);
+                       if (cdata.num_packets == 0)
+                               cdata.num_packets = INT64_MAX;
+                       break;
+               case 'f':
+                       cdata.num_fids = (unsigned int)atoi(optarg);
+                       break;
+               case 's':
+                       cdata.num_stages = (unsigned int)atoi(optarg);
+                       break;
+               case 'c':
+                       cdata.worker_cq_depth = (unsigned int)atoi(optarg);
+                       break;
+               case 'W':
+                       cdata.worker_cycles = (unsigned int)atoi(optarg);
+                       break;
+               case 'P':
+                       cdata.enable_queue_priorities = 1;
+                       break;
+               case 'o':
+                       cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
+                       break;
+               case 'p':
+                       cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
+                       break;
+               case 'a':
+                       cdata.all_type_queues = 1;
+                       break;
+               case 'q':
+                       cdata.quiet = 1;
+                       break;
+               case 'D':
+                       cdata.dump_dev = 1;
+                       break;
+               case 'w':
+                       worker_lcore_mask = parse_coremask(optarg);
+                       break;
+               case 'r':
+                       rx_lcore_mask = parse_coremask(optarg);
+                       popcnt = __builtin_popcountll(rx_lcore_mask);
+                       fdata->rx_single = (popcnt == 1);
+                       break;
+               case 't':
+                       tx_lcore_mask = parse_coremask(optarg);
+                       popcnt = __builtin_popcountll(tx_lcore_mask);
+                       fdata->tx_single = (popcnt == 1);
+                       break;
+               case 'e':
+                       sched_lcore_mask = parse_coremask(optarg);
+                       popcnt = __builtin_popcountll(sched_lcore_mask);
+                       fdata->sched_single = (popcnt == 1);
+                       break;
+               case 'm':
+                       cdata.num_mbuf = (uint64_t)atol(optarg);
+                       break;
+               default:
+                       usage();
+               }
+       }
+
+       cdata.worker_lcore_mask = worker_lcore_mask;
+       cdata.sched_lcore_mask = sched_lcore_mask;
+       cdata.rx_lcore_mask = rx_lcore_mask;
+       cdata.tx_lcore_mask = tx_lcore_mask;
+
+       if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
+               usage();
+
+       for (i = 0; i < MAX_NUM_CORE; i++) {
+               fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
+               fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
+               fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
+               fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
+
+               if (fdata->worker_core[i])
+                       cdata.num_workers++;
+               if (core_in_use(i))
+                       cdata.active_cores++;
+       }
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+       static const struct rte_eth_conf port_conf_default = {
+               .rxmode = {
+                       .mq_mode = ETH_MQ_RX_RSS,
+                       .max_rx_pkt_len = ETHER_MAX_LEN,
+                       .ignore_offload_bitfield = 1,
+               },
+               .rx_adv_conf = {
+                       .rss_conf = {
+                               .rss_hf = ETH_RSS_IP |
+                                         ETH_RSS_TCP |
+                                         ETH_RSS_UDP,
+                       }
+               }
+       };
+       const uint16_t rx_rings = 1, tx_rings = 1;
+       const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+       struct rte_eth_conf port_conf = port_conf_default;
+       int retval;
+       uint16_t q;
+       struct rte_eth_dev_info dev_info;
+       struct rte_eth_txconf txconf;
+
+       if (port >= rte_eth_dev_count())
+               return -1;
+
+       rte_eth_dev_info_get(port, &dev_info);
+       if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
+               port_conf.txmode.offloads |=
+                       DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+
+       /* Configure the Ethernet device. */
+       retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+       if (retval != 0)
+               return retval;
+
+       /* Allocate and set up 1 RX queue per Ethernet port. */
+       for (q = 0; q < rx_rings; q++) {
+               retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+                               rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+               if (retval < 0)
+                       return retval;
+       }
+
+       txconf = dev_info.default_txconf;
+       txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE;
+       txconf.offloads = port_conf_default.txmode.offloads;
+       /* Allocate and set up 1 TX queue per Ethernet port. */
+       for (q = 0; q < tx_rings; q++) {
+               retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+                               rte_eth_dev_socket_id(port), &txconf);
+               if (retval < 0)
+                       return retval;
+       }
+
+       /* Start the Ethernet port. */
+       retval = rte_eth_dev_start(port);
+       if (retval < 0)
+               return retval;
+
+       /* Display the port MAC address. */
+       struct ether_addr addr;
+       rte_eth_macaddr_get(port, &addr);
+       printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+                          " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+                       (unsigned int)port,
+                       addr.addr_bytes[0], addr.addr_bytes[1],
+                       addr.addr_bytes[2], addr.addr_bytes[3],
+                       addr.addr_bytes[4], addr.addr_bytes[5]);
+
+       /* Enable RX in promiscuous mode for the Ethernet device. */
+       rte_eth_promiscuous_enable(port);
+
+       return 0;
+}
+
+static int
+init_ports(unsigned int num_ports)
+{
+       uint8_t portid;
+       unsigned int i;
+
+       if (!cdata.num_mbuf)
+               cdata.num_mbuf = 16384 * num_ports;
+
+       struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+                       /* mbufs */ cdata.num_mbuf,
+                       /* cache_size */ 512,
+                       /* priv_size*/ 0,
+                       /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+                       rte_socket_id());
+
+       for (portid = 0; portid < num_ports; portid++)
+               if (port_init(portid, mp) != 0)
+                       rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
+                                       portid);
+
+       for (i = 0; i < num_ports; i++) {
+               void *userdata = (void *)(uintptr_t) i;
+               fdata->tx_buf[i] =
+                       rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+               if (fdata->tx_buf[i] == NULL)
+                       rte_panic("Out of memory\n");
+               rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
+               rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
+                                                  eth_tx_buffer_retry,
+                                                  userdata);
+       }
+
+       return 0;
+}
+
+static void
+do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id)
+{
+       int i;
+       uint8_t mt_unsafe = 0;
+       uint8_t burst = 0;
+
+       for (i = 0; i < nb_ethdev; i++) {
+               struct rte_eth_dev_info dev_info;
+               memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
+
+               rte_eth_dev_info_get(i, &dev_info);
+               /* Check if it is safe ask worker to tx. */
+               mt_unsafe |= !(dev_info.tx_offload_capa &
+                               DEV_TX_OFFLOAD_MT_LOCKFREE);
+       }
+
+       struct rte_event_dev_info eventdev_info;
+       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+
+       rte_event_dev_info_get(eventdev_id, &eventdev_info);
+       burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
+               0;
+
+       if (mt_unsafe)
+               set_worker_generic_setup_data(&fdata->cap, burst);
+       else
+               set_worker_tx_setup_data(&fdata->cap, burst);
+}
+
+static void
+signal_handler(int signum)
+{
+       if (fdata->done)
+               rte_exit(1, "Exiting on signal %d\n", signum);
+       if (signum == SIGINT || signum == SIGTERM) {
+               printf("\n\nSignal %d received, preparing to exit...\n",
+                               signum);
+               fdata->done = 1;
+       }
+       if (signum == SIGTSTP)
+               rte_event_dev_dump(0, stdout);
+}
+
+static inline uint64_t
+port_stat(int dev_id, int32_t p)
+{
+       char statname[64];
+       snprintf(statname, sizeof(statname), "port_%u_rx", p);
+       return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
+}
+
+int
+main(int argc, char **argv)
+{
+       struct worker_data *worker_data;
+       unsigned int num_ports;
+       int lcore_id;
+       int err;
+
+       signal(SIGINT, signal_handler);
+       signal(SIGTERM, signal_handler);
+       signal(SIGTSTP, signal_handler);
+
+       err = rte_eal_init(argc, argv);
+       if (err < 0)
+               rte_panic("Invalid EAL arguments\n");
+
+       argc -= err;
+       argv += err;
+
+       fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
+       if (fdata == NULL)
+               rte_panic("Out of memory\n");
+
+       /* Parse cli options*/
+       parse_app_args(argc, argv);
+
+       num_ports = rte_eth_dev_count();
+       if (num_ports == 0)
+               rte_panic("No ethernet ports found\n");
+
+       const unsigned int cores_needed = cdata.active_cores;
+
+       if (!cdata.quiet) {
+               printf("  Config:\n");
+               printf("\tports: %u\n", num_ports);
+               printf("\tworkers: %u\n", cdata.num_workers);
+               printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
+               printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
+               if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
+                       printf("\tqid0 type: ordered\n");
+               if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
+                       printf("\tqid0 type: atomic\n");
+               printf("\tCores available: %u\n", rte_lcore_count());
+               printf("\tCores used: %u\n", cores_needed);
+       }
+
+       if (rte_lcore_count() < cores_needed)
+               rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
+                               cores_needed);
+
+       const unsigned int ndevs = rte_event_dev_count();
+       if (ndevs == 0)
+               rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
+       if (ndevs > 1)
+               fprintf(stderr, "Warning: More than one eventdev, using idx 0");
+
+
+       do_capability_setup(num_ports, 0);
+       fdata->cap.check_opt();
+
+       worker_data = rte_calloc(0, cdata.num_workers,
+                       sizeof(worker_data[0]), 0);
+       if (worker_data == NULL)
+               rte_panic("rte_calloc failed\n");
+
+       int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
+       if (dev_id < 0)
+               rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
+
+       init_ports(num_ports);
+       fdata->cap.adptr_setup(num_ports);
+
+       int worker_idx = 0;
+       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+               if (lcore_id >= MAX_NUM_CORE)
+                       break;
+
+               if (!fdata->rx_core[lcore_id] &&
+                       !fdata->worker_core[lcore_id] &&
+                       !fdata->tx_core[lcore_id] &&
+                       !fdata->sched_core[lcore_id])
+                       continue;
+
+               if (fdata->rx_core[lcore_id])
+                       printf(
+                               "[%s()] lcore %d executing NIC Rx\n",
+                               __func__, lcore_id);
+
+               if (fdata->tx_core[lcore_id])
+                       printf(
+                               "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
+                               __func__, lcore_id, cons_data.port_id);
+
+               if (fdata->sched_core[lcore_id])
+                       printf("[%s()] lcore %d executing scheduler\n",
+                                       __func__, lcore_id);
+
+               if (fdata->worker_core[lcore_id])
+                       printf(
+                               "[%s()] lcore %d executing worker, using eventdev port %u\n",
+                               __func__, lcore_id,
+                               worker_data[worker_idx].port_id);
+
+               err = rte_eal_remote_launch(fdata->cap.worker,
+                               &worker_data[worker_idx], lcore_id);
+               if (err) {
+                       rte_panic("Failed to launch worker on core %d\n",
+                                       lcore_id);
+                       continue;
+               }
+               if (fdata->worker_core[lcore_id])
+                       worker_idx++;
+       }
+
+       lcore_id = rte_lcore_id();
+
+       if (core_in_use(lcore_id))
+               fdata->cap.worker(&worker_data[worker_idx++]);
+
+       rte_eal_mp_wait_lcore();
+
+       if (cdata.dump_dev)
+               rte_event_dev_dump(dev_id, stdout);
+
+       if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
+                       (uint64_t)-ENOTSUP)) {
+               printf("\nPort Workload distribution:\n");
+               uint32_t i;
+               uint64_t tot_pkts = 0;
+               uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
+               for (i = 0; i < cdata.num_workers; i++) {
+                       pkts_per_wkr[i] =
+                               port_stat(dev_id, worker_data[i].port_id);
+                       tot_pkts += pkts_per_wkr[i];
+               }
+               for (i = 0; i < cdata.num_workers; i++) {
+                       float pc = pkts_per_wkr[i]  * 100 /
+                               ((float)tot_pkts);
+                       printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
+                                       i, pc, pkts_per_wkr[i]);
+               }
+
+       }
+
+       return 0;
+}
diff --git a/examples/eventdev_pipeline/pipeline_common.h b/examples/eventdev_pipeline/pipeline_common.h
new file mode 100644 (file)
index 0000000..9703396
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2016 Intel Corporation.
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include <stdbool.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+#include <rte_event_eth_rx_adapter.h>
+#include <rte_service.h>
+#include <rte_service_component.h>
+
+#define MAX_NUM_STAGES 8
+#define BATCH_SIZE 16
+#define MAX_NUM_CORE 64
+
+struct cons_data {
+       uint8_t dev_id;
+       uint8_t port_id;
+       uint8_t release;
+} __rte_cache_aligned;
+
+struct worker_data {
+       uint8_t dev_id;
+       uint8_t port_id;
+} __rte_cache_aligned;
+
+typedef int (*worker_loop)(void *);
+typedef int (*consumer_loop)(void);
+typedef void (*schedule_loop)(unsigned int);
+typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
+typedef void (*rx_adapter_setup)(uint16_t nb_ports);
+typedef void (*opt_check)(void);
+
+struct setup_data {
+       worker_loop worker;
+       consumer_loop consumer;
+       schedule_loop scheduler;
+       eventdev_setup evdev_setup;
+       rx_adapter_setup adptr_setup;
+       opt_check check_opt;
+};
+
+struct fastpath_data {
+       volatile int done;
+       uint32_t tx_lock;
+       uint32_t evdev_service_id;
+       uint32_t rxadptr_service_id;
+       bool rx_single;
+       bool tx_single;
+       bool sched_single;
+       unsigned int rx_core[MAX_NUM_CORE];
+       unsigned int tx_core[MAX_NUM_CORE];
+       unsigned int sched_core[MAX_NUM_CORE];
+       unsigned int worker_core[MAX_NUM_CORE];
+       struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+       struct setup_data cap;
+} __rte_cache_aligned;
+
+struct config_data {
+       unsigned int active_cores;
+       unsigned int num_workers;
+       int64_t num_packets;
+       uint64_t num_mbuf;
+       unsigned int num_fids;
+       int queue_type;
+       int worker_cycles;
+       int enable_queue_priorities;
+       int quiet;
+       int dump_dev;
+       int dump_dev_signal;
+       int all_type_queues;
+       unsigned int num_stages;
+       unsigned int worker_cq_depth;
+       unsigned int rx_stride;
+       /* Use rx stride value to reduce congestion in entry queue when using
+        * multiple eth ports by forming multiple event queue pipelines.
+        */
+       int16_t next_qid[MAX_NUM_STAGES+2];
+       int16_t qid[MAX_NUM_STAGES];
+       uint8_t rx_adapter_id;
+       uint64_t worker_lcore_mask;
+       uint64_t rx_lcore_mask;
+       uint64_t tx_lcore_mask;
+       uint64_t sched_lcore_mask;
+};
+
+struct port_link {
+       uint8_t queue_id;
+       uint8_t priority;
+};
+
+struct cons_data cons_data;
+
+struct fastpath_data *fdata;
+struct config_data cdata;
+
+static __rte_always_inline void
+exchange_mac(struct rte_mbuf *m)
+{
+       struct ether_hdr *eth;
+       struct ether_addr addr;
+
+       /* change mac addresses on packet (to use mbuf data) */
+       eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
+       ether_addr_copy(&eth->d_addr, &addr);
+       ether_addr_copy(&addr, &eth->d_addr);
+}
+
+static __rte_always_inline void
+work(void)
+{
+       /* do a number of cycles of work per packet */
+       volatile uint64_t start_tsc = rte_rdtsc();
+       while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
+               rte_pause();
+}
+
+static __rte_always_inline void
+schedule_devices(unsigned int lcore_id)
+{
+       if (fdata->rx_core[lcore_id]) {
+               rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
+                               !fdata->rx_single);
+       }
+
+       if (fdata->sched_core[lcore_id]) {
+               rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
+                               !fdata->sched_single);
+               if (cdata.dump_dev_signal) {
+                       rte_event_dev_dump(0, stdout);
+                       cdata.dump_dev_signal = 0;
+               }
+       }
+
+       if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
+                        rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
+               fdata->cap.consumer();
+               rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+       }
+}
+
+void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
+void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline/pipeline_worker_generic.c b/examples/eventdev_pipeline/pipeline_worker_generic.c
new file mode 100644 (file)
index 0000000..2c51f4a
--- /dev/null
@@ -0,0 +1,569 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2016 Intel Corporation.
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include "pipeline_common.h"
+
+static __rte_always_inline int
+worker_generic(void *arg)
+{
+       struct rte_event ev;
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev_id = data->dev_id;
+       uint8_t port_id = data->port_id;
+       size_t sent = 0, received = 0;
+       unsigned int lcore_id = rte_lcore_id();
+
+       while (!fdata->done) {
+
+               if (fdata->cap.scheduler)
+                       fdata->cap.scheduler(lcore_id);
+
+               if (!fdata->worker_core[lcore_id]) {
+                       rte_pause();
+                       continue;
+               }
+
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+                               &ev, 1, 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received++;
+
+               /* The first worker stage does classification */
+               if (ev.queue_id == cdata.qid[0])
+                       ev.flow_id = ev.mbuf->hash.rss
+                                               % cdata.num_fids;
+
+               ev.queue_id = cdata.next_qid[ev.queue_id];
+               ev.op = RTE_EVENT_OP_FORWARD;
+               ev.sched_type = cdata.queue_type;
+
+               work();
+
+               while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
+                       rte_pause();
+               sent++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu TX=%zu\n",
+                               rte_lcore_id(), received, sent);
+
+       return 0;
+}
+
+static int
+worker_generic_burst(void *arg)
+{
+       struct rte_event events[BATCH_SIZE];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev_id = data->dev_id;
+       uint8_t port_id = data->port_id;
+       size_t sent = 0, received = 0;
+       unsigned int lcore_id = rte_lcore_id();
+
+       while (!fdata->done) {
+               uint16_t i;
+
+               if (fdata->cap.scheduler)
+                       fdata->cap.scheduler(lcore_id);
+
+               if (!fdata->worker_core[lcore_id]) {
+                       rte_pause();
+                       continue;
+               }
+
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+                               events, RTE_DIM(events), 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+
+                       /* The first worker stage does classification */
+                       if (events[i].queue_id == cdata.qid[0])
+                               events[i].flow_id = events[i].mbuf->hash.rss
+                                                       % cdata.num_fids;
+
+                       events[i].queue_id = cdata.next_qid[events[i].queue_id];
+                       events[i].op = RTE_EVENT_OP_FORWARD;
+                       events[i].sched_type = cdata.queue_type;
+
+                       work();
+               }
+               uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+                               events, nb_rx);
+               while (nb_tx < nb_rx && !fdata->done)
+                       nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+                                                       events + nb_tx,
+                                                       nb_rx - nb_tx);
+               sent += nb_tx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu TX=%zu\n",
+                               rte_lcore_id(), received, sent);
+
+       return 0;
+}
+
+static __rte_always_inline int
+consumer(void)
+{
+       const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+       struct rte_event packet;
+
+       static uint64_t received;
+       static uint64_t last_pkts;
+       static uint64_t last_time;
+       static uint64_t start_time;
+       int i;
+       uint8_t dev_id = cons_data.dev_id;
+       uint8_t port_id = cons_data.port_id;
+
+       do {
+               uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+                               &packet, 1, 0);
+
+               if (n == 0) {
+                       for (i = 0; i < rte_eth_dev_count(); i++)
+                               rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
+                       return 0;
+               }
+               if (start_time == 0)
+                       last_time = start_time = rte_get_timer_cycles();
+
+               received++;
+               uint8_t outport = packet.mbuf->port;
+
+               exchange_mac(packet.mbuf);
+               rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+                               packet.mbuf);
+
+               if (cons_data.release)
+                       rte_event_enqueue_burst(dev_id, port_id,
+                                                               &packet, n);
+
+               /* Print out mpps every 1<22 packets */
+               if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+                       const uint64_t now = rte_get_timer_cycles();
+                       const uint64_t total_ms = (now - start_time) / freq_khz;
+                       const uint64_t delta_ms = (now - last_time) / freq_khz;
+                       uint64_t delta_pkts = received - last_pkts;
+
+                       printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
+                                       "avg %.3f mpps [current %.3f mpps]\n",
+                                       __func__,
+                                       received,
+                                       total_ms,
+                                       received / (total_ms * 1000.0),
+                                       delta_pkts / (delta_ms * 1000.0));
+                       last_pkts = received;
+                       last_time = now;
+               }
+
+               cdata.num_packets--;
+               if (cdata.num_packets <= 0)
+                       fdata->done = 1;
+       /* Be stuck in this loop if single. */
+       } while (!fdata->done && fdata->tx_single);
+
+       return 0;
+}
+
+static __rte_always_inline int
+consumer_burst(void)
+{
+       const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+       struct rte_event packets[BATCH_SIZE];
+
+       static uint64_t received;
+       static uint64_t last_pkts;
+       static uint64_t last_time;
+       static uint64_t start_time;
+       unsigned int i, j;
+       uint8_t dev_id = cons_data.dev_id;
+       uint8_t port_id = cons_data.port_id;
+       uint16_t nb_ports = rte_eth_dev_count();
+
+       do {
+               uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+                               packets, RTE_DIM(packets), 0);
+
+               if (n == 0) {
+                       for (j = 0; j < nb_ports; j++)
+                               rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
+                       return 0;
+               }
+               if (start_time == 0)
+                       last_time = start_time = rte_get_timer_cycles();
+
+               received += n;
+               for (i = 0; i < n; i++) {
+                       uint8_t outport = packets[i].mbuf->port;
+
+                       exchange_mac(packets[i].mbuf);
+                       rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+                                       packets[i].mbuf);
+
+                       packets[i].op = RTE_EVENT_OP_RELEASE;
+               }
+
+               if (cons_data.release) {
+                       uint16_t nb_tx;
+
+                       nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+                                                               packets, n);
+                       while (nb_tx < n)
+                               nb_tx += rte_event_enqueue_burst(dev_id,
+                                               port_id, packets + nb_tx,
+                                               n - nb_tx);
+               }
+
+               /* Print out mpps every 1<22 packets */
+               if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+                       const uint64_t now = rte_get_timer_cycles();
+                       const uint64_t total_ms = (now - start_time) / freq_khz;
+                       const uint64_t delta_ms = (now - last_time) / freq_khz;
+                       uint64_t delta_pkts = received - last_pkts;
+
+                       printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
+                                       "avg %.3f mpps [current %.3f mpps]\n",
+                                       received,
+                                       total_ms,
+                                       received / (total_ms * 1000.0),
+                                       delta_pkts / (delta_ms * 1000.0));
+                       last_pkts = received;
+                       last_time = now;
+               }
+
+               cdata.num_packets -= n;
+               if (cdata.num_packets <= 0)
+                       fdata->done = 1;
+       /* Be stuck in this loop if single. */
+       } while (!fdata->done && fdata->tx_single);
+
+       return 0;
+}
+
+static int
+setup_eventdev_generic(struct cons_data *cons_data,
+               struct worker_data *worker_data)
+{
+       const uint8_t dev_id = 0;
+       /* +1 stages is for a SINGLE_LINK TX stage */
+       const uint8_t nb_queues = cdata.num_stages + 1;
+       /* + 1 is one port for consumer */
+       const uint8_t nb_ports = cdata.num_workers + 1;
+       struct rte_event_dev_config config = {
+                       .nb_event_queues = nb_queues,
+                       .nb_event_ports = nb_ports,
+                       .nb_events_limit  = 4096,
+                       .nb_event_queue_flows = 1024,
+                       .nb_event_port_dequeue_depth = 128,
+                       .nb_event_port_enqueue_depth = 128,
+       };
+       struct rte_event_port_conf wkr_p_conf = {
+                       .dequeue_depth = cdata.worker_cq_depth,
+                       .enqueue_depth = 64,
+                       .new_event_threshold = 4096,
+       };
+       struct rte_event_queue_conf wkr_q_conf = {
+                       .schedule_type = cdata.queue_type,
+                       .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+                       .nb_atomic_flows = 1024,
+               .nb_atomic_order_sequences = 1024,
+       };
+       struct rte_event_port_conf tx_p_conf = {
+                       .dequeue_depth = 128,
+                       .enqueue_depth = 128,
+                       .new_event_threshold = 4096,
+       };
+       struct rte_event_queue_conf tx_q_conf = {
+                       .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
+                       .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
+       };
+
+       struct port_link worker_queues[MAX_NUM_STAGES];
+       uint8_t disable_implicit_release;
+       struct port_link tx_queue;
+       unsigned int i;
+
+       int ret, ndev = rte_event_dev_count();
+       if (ndev < 1) {
+               printf("%d: No Eventdev Devices Found\n", __LINE__);
+               return -1;
+       }
+
+       struct rte_event_dev_info dev_info;
+       ret = rte_event_dev_info_get(dev_id, &dev_info);
+       printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+
+       disable_implicit_release = (dev_info.event_dev_cap &
+                       RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+       wkr_p_conf.disable_implicit_release = disable_implicit_release;
+       tx_p_conf.disable_implicit_release = disable_implicit_release;
+
+       if (dev_info.max_event_port_dequeue_depth <
+                       config.nb_event_port_dequeue_depth)
+               config.nb_event_port_dequeue_depth =
+                               dev_info.max_event_port_dequeue_depth;
+       if (dev_info.max_event_port_enqueue_depth <
+                       config.nb_event_port_enqueue_depth)
+               config.nb_event_port_enqueue_depth =
+                               dev_info.max_event_port_enqueue_depth;
+
+       ret = rte_event_dev_configure(dev_id, &config);
+       if (ret < 0) {
+               printf("%d: Error configuring device\n", __LINE__);
+               return -1;
+       }
+
+       /* Q creation - one load balanced per pipeline stage*/
+       printf("  Stages:\n");
+       for (i = 0; i < cdata.num_stages; i++) {
+               if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
+                       printf("%d: error creating qid %d\n", __LINE__, i);
+                       return -1;
+               }
+               cdata.qid[i] = i;
+               cdata.next_qid[i] = i+1;
+               worker_queues[i].queue_id = i;
+               if (cdata.enable_queue_priorities) {
+                       /* calculate priority stepping for each stage, leaving
+                        * headroom of 1 for the SINGLE_LINK TX below
+                        */
+                       const uint32_t prio_delta =
+                               (RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
+
+                       /* higher priority for queues closer to tx */
+                       wkr_q_conf.priority =
+                               RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
+               }
+
+               const char *type_str = "Atomic";
+               switch (wkr_q_conf.schedule_type) {
+               case RTE_SCHED_TYPE_ORDERED:
+                       type_str = "Ordered";
+                       break;
+               case RTE_SCHED_TYPE_PARALLEL:
+                       type_str = "Parallel";
+                       break;
+               }
+               printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
+                               wkr_q_conf.priority);
+       }
+       printf("\n");
+
+       /* final queue for sending to TX core */
+       if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
+               printf("%d: error creating qid %d\n", __LINE__, i);
+               return -1;
+       }
+       tx_queue.queue_id = i;
+       tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+
+       if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+               wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+       if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+               wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+       /* set up one port per worker, linking to all stage queues */
+       for (i = 0; i < cdata.num_workers; i++) {
+               struct worker_data *w = &worker_data[i];
+               w->dev_id = dev_id;
+               if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
+                       printf("Error setting up port %d\n", i);
+                       return -1;
+               }
+
+               uint32_t s;
+               for (s = 0; s < cdata.num_stages; s++) {
+                       if (rte_event_port_link(dev_id, i,
+                                               &worker_queues[s].queue_id,
+                                               &worker_queues[s].priority,
+                                               1) != 1) {
+                               printf("%d: error creating link for port %d\n",
+                                               __LINE__, i);
+                               return -1;
+                       }
+               }
+               w->port_id = i;
+       }
+
+       if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+               tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+       if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+               tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+       /* port for consumer, linked to TX queue */
+       if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
+               printf("Error setting up port %d\n", i);
+               return -1;
+       }
+       if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
+                               &tx_queue.priority, 1) != 1) {
+               printf("%d: error creating link for port %d\n",
+                               __LINE__, i);
+               return -1;
+       }
+       *cons_data = (struct cons_data){.dev_id = dev_id,
+                                       .port_id = i,
+                                       .release = disable_implicit_release };
+
+       ret = rte_event_dev_service_id_get(dev_id,
+                               &fdata->evdev_service_id);
+       if (ret != -ESRCH && ret != 0) {
+               printf("Error getting the service ID for sw eventdev\n");
+               return -1;
+       }
+       rte_service_runstate_set(fdata->evdev_service_id, 1);
+       rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
+       if (rte_event_dev_start(dev_id) < 0) {
+               printf("Error starting eventdev\n");
+               return -1;
+       }
+
+       return dev_id;
+}
+
+static void
+init_rx_adapter(uint16_t nb_ports)
+{
+       int i;
+       int ret;
+       uint8_t evdev_id = 0;
+       struct rte_event_dev_info dev_info;
+
+       ret = rte_event_dev_info_get(evdev_id, &dev_info);
+
+       struct rte_event_port_conf rx_p_conf = {
+               .dequeue_depth = 8,
+               .enqueue_depth = 8,
+               .new_event_threshold = 1200,
+       };
+
+       if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+               rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
+       if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+               rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+
+       /* Create one adapter for all the ethernet ports. */
+       ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
+                       &rx_p_conf);
+       if (ret)
+               rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
+                               cdata.rx_adapter_id);
+
+       struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
+               .ev.sched_type = cdata.queue_type,
+               .ev.queue_id = cdata.qid[0],
+       };
+
+       for (i = 0; i < nb_ports; i++) {
+               uint32_t cap;
+
+               ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "failed to get event rx adapter "
+                                       "capabilities");
+
+               ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
+                               -1, &queue_conf);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "Failed to add queues to Rx adapter");
+       }
+
+       ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
+                               &fdata->rxadptr_service_id);
+       if (ret != -ESRCH && ret != 0) {
+               rte_exit(EXIT_FAILURE,
+                       "Error getting the service ID for sw eventdev\n");
+       }
+       rte_service_runstate_set(fdata->rxadptr_service_id, 1);
+       rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
+
+       ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
+       if (ret)
+               rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
+                               cdata.rx_adapter_id);
+}
+
+static void
+generic_opt_check(void)
+{
+       int i;
+       int ret;
+       uint32_t cap = 0;
+       uint8_t rx_needed = 0;
+       struct rte_event_dev_info eventdev_info;
+
+       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+       rte_event_dev_info_get(0, &eventdev_info);
+
+       if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
+                               RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
+               rte_exit(EXIT_FAILURE,
+                               "Event dev doesn't support all type queues\n");
+
+       for (i = 0; i < rte_eth_dev_count(); i++) {
+               ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                               "failed to get event rx adapter capabilities");
+               rx_needed |=
+                       !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+       }
+
+       if (cdata.worker_lcore_mask == 0 ||
+                       (rx_needed && cdata.rx_lcore_mask == 0) ||
+                       cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
+                               && !(eventdev_info.event_dev_cap &
+                                       RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+               printf("Core part of pipeline was not assigned any cores. "
+                       "This will stall the pipeline, please check core masks "
+                       "(use -h for details on setting core masks):\n"
+                       "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
+                       "\n\tworkers: %"PRIu64"\n",
+                       cdata.rx_lcore_mask, cdata.tx_lcore_mask,
+                       cdata.sched_lcore_mask,
+                       cdata.worker_lcore_mask);
+               rte_exit(-1, "Fix core masks\n");
+       }
+
+       if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+               memset(fdata->sched_core, 0,
+                               sizeof(unsigned int) * MAX_NUM_CORE);
+}
+
+void
+set_worker_generic_setup_data(struct setup_data *caps, bool burst)
+{
+       if (burst) {
+               caps->consumer = consumer_burst;
+               caps->worker = worker_generic_burst;
+       } else {
+               caps->consumer = consumer;
+               caps->worker = worker_generic;
+       }
+
+       caps->adptr_setup = init_rx_adapter;
+       caps->scheduler = schedule_devices;
+       caps->evdev_setup = setup_eventdev_generic;
+       caps->check_opt = generic_opt_check;
+}
diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c
new file mode 100644 (file)
index 0000000..c0d1bd9
--- /dev/null
@@ -0,0 +1,838 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include "pipeline_common.h"
+
+static __rte_always_inline void
+worker_fwd_event(struct rte_event *ev, uint8_t sched)
+{
+       ev->event_type = RTE_EVENT_TYPE_CPU;
+       ev->op = RTE_EVENT_OP_FORWARD;
+       ev->sched_type = sched;
+}
+
+static __rte_always_inline void
+worker_event_enqueue(const uint8_t dev, const uint8_t port,
+               struct rte_event *ev)
+{
+       while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+               rte_pause();
+}
+
+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+               struct rte_event *ev, const uint16_t nb_rx)
+{
+       uint16_t enq;
+
+       enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+       while (enq < nb_rx) {
+               enq += rte_event_enqueue_burst(dev, port,
+                                               ev + enq, nb_rx - enq);
+       }
+}
+
+static __rte_always_inline void
+worker_tx_pkt(struct rte_mbuf *mbuf)
+{
+       exchange_mac(mbuf);
+       while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+               rte_pause();
+}
+
+/* Single stage pipeline workers */
+
+static int
+worker_do_tx_single(void *arg)
+{
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       size_t fwd = 0, received = 0, tx = 0;
+       struct rte_event ev;
+
+       while (!fdata->done) {
+
+               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+                       rte_pause();
+                       continue;
+               }
+
+               received++;
+
+               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                       worker_tx_pkt(ev.mbuf);
+                       tx++;
+                       continue;
+               }
+               work();
+               ev.queue_id++;
+               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+               worker_event_enqueue(dev, port, &ev);
+               fwd++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+       return 0;
+}
+
+static int
+worker_do_tx_single_atq(void *arg)
+{
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       size_t fwd = 0, received = 0, tx = 0;
+       struct rte_event ev;
+
+       while (!fdata->done) {
+
+               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+                       rte_pause();
+                       continue;
+               }
+
+               received++;
+
+               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                       worker_tx_pkt(ev.mbuf);
+                       tx++;
+                       continue;
+               }
+               work();
+               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+               worker_event_enqueue(dev, port, &ev);
+               fwd++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+       return 0;
+}
+
+static int
+worker_do_tx_single_burst(void *arg)
+{
+       struct rte_event ev[BATCH_SIZE + 1];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+               uint16_t i;
+               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+                               BATCH_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+                       rte_prefetch0(ev[i + 1].mbuf);
+                       if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+                               worker_tx_pkt(ev[i].mbuf);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
+                               tx++;
+
+                       } else {
+                               ev[i].queue_id++;
+                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+                       }
+                       work();
+               }
+
+               worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_rx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+       return 0;
+}
+
+static int
+worker_do_tx_single_burst_atq(void *arg)
+{
+       struct rte_event ev[BATCH_SIZE + 1];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+               uint16_t i;
+               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+                               BATCH_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+                       rte_prefetch0(ev[i + 1].mbuf);
+                       if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+                               worker_tx_pkt(ev[i].mbuf);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
+                               tx++;
+                       } else
+                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+                       work();
+               }
+
+               worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_rx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+       return 0;
+}
+
+/* Multi stage Pipeline Workers */
+
+static int
+worker_do_tx(void *arg)
+{
+       struct rte_event ev;
+
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       const uint8_t lst_qid = cdata.num_stages - 1;
+       size_t fwd = 0, received = 0, tx = 0;
+
+
+       while (!fdata->done) {
+
+               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+                       rte_pause();
+                       continue;
+               }
+
+               received++;
+               const uint8_t cq_id = ev.queue_id % cdata.num_stages;
+
+               if (cq_id >= lst_qid) {
+                       if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                               worker_tx_pkt(ev.mbuf);
+                               tx++;
+                               continue;
+                       }
+
+                       worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+                       ev.queue_id = (cq_id == lst_qid) ?
+                               cdata.next_qid[ev.queue_id] : ev.queue_id;
+               } else {
+                       ev.queue_id = cdata.next_qid[ev.queue_id];
+                       worker_fwd_event(&ev, cdata.queue_type);
+               }
+               work();
+
+               worker_event_enqueue(dev, port, &ev);
+               fwd++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+
+       return 0;
+}
+
+static int
+worker_do_tx_atq(void *arg)
+{
+       struct rte_event ev;
+
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       const uint8_t lst_qid = cdata.num_stages - 1;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+
+               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+                       rte_pause();
+                       continue;
+               }
+
+               received++;
+               const uint8_t cq_id = ev.sub_event_type % cdata.num_stages;
+
+               if (cq_id == lst_qid) {
+                       if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                               worker_tx_pkt(ev.mbuf);
+                               tx++;
+                               continue;
+                       }
+
+                       worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+               } else {
+                       ev.sub_event_type++;
+                       worker_fwd_event(&ev, cdata.queue_type);
+               }
+               work();
+
+               worker_event_enqueue(dev, port, &ev);
+               fwd++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+
+       return 0;
+}
+
+static int
+worker_do_tx_burst(void *arg)
+{
+       struct rte_event ev[BATCH_SIZE];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev = data->dev_id;
+       uint8_t port = data->port_id;
+       uint8_t lst_qid = cdata.num_stages - 1;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+               uint16_t i;
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+                               ev, BATCH_SIZE, 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+                       const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+                       if (cq_id >= lst_qid) {
+                               if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                                       worker_tx_pkt(ev[i].mbuf);
+                                       tx++;
+                                       ev[i].op = RTE_EVENT_OP_RELEASE;
+                                       continue;
+                               }
+                               ev[i].queue_id = (cq_id == lst_qid) ?
+                                       cdata.next_qid[ev[i].queue_id] :
+                                       ev[i].queue_id;
+
+                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+                       } else {
+                               ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
+                               worker_fwd_event(&ev[i], cdata.queue_type);
+                       }
+                       work();
+               }
+               worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+               fwd += nb_rx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+
+       return 0;
+}
+
+static int
+worker_do_tx_burst_atq(void *arg)
+{
+       struct rte_event ev[BATCH_SIZE];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev = data->dev_id;
+       uint8_t port = data->port_id;
+       uint8_t lst_qid = cdata.num_stages - 1;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+               uint16_t i;
+
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+                               ev, BATCH_SIZE, 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+                       const uint8_t cq_id = ev[i].sub_event_type %
+                               cdata.num_stages;
+
+                       if (cq_id == lst_qid) {
+                               if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                                       worker_tx_pkt(ev[i].mbuf);
+                                       tx++;
+                                       ev[i].op = RTE_EVENT_OP_RELEASE;
+                                       continue;
+                               }
+
+                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+                       } else {
+                               ev[i].sub_event_type++;
+                               worker_fwd_event(&ev[i], cdata.queue_type);
+                       }
+                       work();
+               }
+
+               worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_rx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+
+       return 0;
+}
+
+static int
+setup_eventdev_worker_tx(struct cons_data *cons_data,
+               struct worker_data *worker_data)
+{
+       RTE_SET_USED(cons_data);
+       uint8_t i;
+       const uint8_t atq = cdata.all_type_queues ? 1 : 0;
+       const uint8_t dev_id = 0;
+       const uint8_t nb_ports = cdata.num_workers;
+       uint8_t nb_slots = 0;
+       uint8_t nb_queues = rte_eth_dev_count();
+
+       /*
+        * In case where all type queues are not enabled, use queues equal to
+        * number of stages * eth_dev_count and one extra queue per pipeline
+        * for Tx.
+        */
+       if (!atq) {
+               nb_queues *= cdata.num_stages;
+               nb_queues += rte_eth_dev_count();
+       }
+
+       struct rte_event_dev_config config = {
+                       .nb_event_queues = nb_queues,
+                       .nb_event_ports = nb_ports,
+                       .nb_events_limit  = 4096,
+                       .nb_event_queue_flows = 1024,
+                       .nb_event_port_dequeue_depth = 128,
+                       .nb_event_port_enqueue_depth = 128,
+       };
+       struct rte_event_port_conf wkr_p_conf = {
+                       .dequeue_depth = cdata.worker_cq_depth,
+                       .enqueue_depth = 64,
+                       .new_event_threshold = 4096,
+       };
+       struct rte_event_queue_conf wkr_q_conf = {
+                       .schedule_type = cdata.queue_type,
+                       .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+                       .nb_atomic_flows = 1024,
+                       .nb_atomic_order_sequences = 1024,
+       };
+
+       int ret, ndev = rte_event_dev_count();
+
+       if (ndev < 1) {
+               printf("%d: No Eventdev Devices Found\n", __LINE__);
+               return -1;
+       }
+
+
+       struct rte_event_dev_info dev_info;
+       ret = rte_event_dev_info_get(dev_id, &dev_info);
+       printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+
+       if (dev_info.max_event_port_dequeue_depth <
+                       config.nb_event_port_dequeue_depth)
+               config.nb_event_port_dequeue_depth =
+                               dev_info.max_event_port_dequeue_depth;
+       if (dev_info.max_event_port_enqueue_depth <
+                       config.nb_event_port_enqueue_depth)
+               config.nb_event_port_enqueue_depth =
+                               dev_info.max_event_port_enqueue_depth;
+
+       ret = rte_event_dev_configure(dev_id, &config);
+       if (ret < 0) {
+               printf("%d: Error configuring device\n", __LINE__);
+               return -1;
+       }
+
+       printf("  Stages:\n");
+       for (i = 0; i < nb_queues; i++) {
+
+               if (atq) {
+
+                       nb_slots = cdata.num_stages;
+                       wkr_q_conf.event_queue_cfg =
+                               RTE_EVENT_QUEUE_CFG_ALL_TYPES;
+               } else {
+                       uint8_t slot;
+
+                       nb_slots = cdata.num_stages + 1;
+                       slot = i % nb_slots;
+                       wkr_q_conf.schedule_type = slot == cdata.num_stages ?
+                               RTE_SCHED_TYPE_ATOMIC : cdata.queue_type;
+               }
+
+               if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
+                       printf("%d: error creating qid %d\n", __LINE__, i);
+                       return -1;
+               }
+               cdata.qid[i] = i;
+               cdata.next_qid[i] = i+1;
+               if (cdata.enable_queue_priorities) {
+                       const uint32_t prio_delta =
+                               (RTE_EVENT_DEV_PRIORITY_LOWEST) /
+                               nb_slots;
+
+                       /* higher priority for queues closer to tx */
+                       wkr_q_conf.priority =
+                               RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta *
+                               (i % nb_slots);
+               }
+
+               const char *type_str = "Atomic";
+               switch (wkr_q_conf.schedule_type) {
+               case RTE_SCHED_TYPE_ORDERED:
+                       type_str = "Ordered";
+                       break;
+               case RTE_SCHED_TYPE_PARALLEL:
+                       type_str = "Parallel";
+                       break;
+               }
+               printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
+                               wkr_q_conf.priority);
+       }
+
+       printf("\n");
+       if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+               wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+       if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+               wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+       /* set up one port per worker, linking to all stage queues */
+       for (i = 0; i < cdata.num_workers; i++) {
+               struct worker_data *w = &worker_data[i];
+               w->dev_id = dev_id;
+               if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
+                       printf("Error setting up port %d\n", i);
+                       return -1;
+               }
+
+               if (rte_event_port_link(dev_id, i, NULL, NULL, 0)
+                               != nb_queues) {
+                       printf("%d: error creating link for port %d\n",
+                                       __LINE__, i);
+                       return -1;
+               }
+               w->port_id = i;
+       }
+       /*
+        * Reduce the load on ingress event queue by splitting the traffic
+        * across multiple event queues.
+        * for example, nb_stages =  2 and nb_ethdev = 2 then
+        *
+        *      nb_queues = (2 * 2) + 2 = 6 (non atq)
+        *      rx_stride = 3
+        *
+        * So, traffic is split across queue 0 and queue 3 since queue id for
+        * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above
+        * case eth port 0, 1 will inject packets into event queue 0, 3
+        * respectively.
+        *
+        * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx.
+        */
+       cdata.rx_stride = atq ? 1 : nb_slots;
+       ret = rte_event_dev_service_id_get(dev_id,
+                               &fdata->evdev_service_id);
+       if (ret != -ESRCH && ret != 0) {
+               printf("Error getting the service ID\n");
+               return -1;
+       }
+       rte_service_runstate_set(fdata->evdev_service_id, 1);
+       rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
+       if (rte_event_dev_start(dev_id) < 0) {
+               printf("Error starting eventdev\n");
+               return -1;
+       }
+
+       return dev_id;
+}
+
+
+struct rx_adptr_services {
+       uint16_t nb_rx_adptrs;
+       uint32_t *rx_adpt_arr;
+};
+
+static int32_t
+service_rx_adapter(void *arg)
+{
+       int i;
+       struct rx_adptr_services *adptr_services = arg;
+
+       for (i = 0; i < adptr_services->nb_rx_adptrs; i++)
+               rte_service_run_iter_on_app_lcore(
+                               adptr_services->rx_adpt_arr[i], 1);
+       return 0;
+}
+
+static void
+init_rx_adapter(uint16_t nb_ports)
+{
+       int i;
+       int ret;
+       uint8_t evdev_id = 0;
+       struct rx_adptr_services *adptr_services = NULL;
+       struct rte_event_dev_info dev_info;
+
+       ret = rte_event_dev_info_get(evdev_id, &dev_info);
+       adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
+
+       struct rte_event_port_conf rx_p_conf = {
+               .dequeue_depth = 8,
+               .enqueue_depth = 8,
+               .new_event_threshold = 1200,
+       };
+
+       if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+               rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
+       if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+               rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+
+
+       struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
+               .ev.sched_type = cdata.queue_type,
+       };
+
+       for (i = 0; i < nb_ports; i++) {
+               uint32_t cap;
+               uint32_t service_id;
+
+               ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "failed to create rx adapter[%d]",
+                                       cdata.rx_adapter_id);
+
+               ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "failed to get event rx adapter "
+                                       "capabilities");
+
+               queue_conf.ev.queue_id = cdata.rx_stride ?
+                       (i * cdata.rx_stride)
+                       : (uint8_t)cdata.qid[0];
+
+               ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "Failed to add queues to Rx adapter");
+
+
+               /* Producer needs to be scheduled. */
+               if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
+                       ret = rte_event_eth_rx_adapter_service_id_get(i,
+                                       &service_id);
+                       if (ret != -ESRCH && ret != 0) {
+                               rte_exit(EXIT_FAILURE,
+                               "Error getting the service ID for rx adptr\n");
+                       }
+
+                       rte_service_runstate_set(service_id, 1);
+                       rte_service_set_runstate_mapped_check(service_id, 0);
+
+                       adptr_services->nb_rx_adptrs++;
+                       adptr_services->rx_adpt_arr = rte_realloc(
+                                       adptr_services->rx_adpt_arr,
+                                       adptr_services->nb_rx_adptrs *
+                                       sizeof(uint32_t), 0);
+                       adptr_services->rx_adpt_arr[
+                               adptr_services->nb_rx_adptrs - 1] =
+                               service_id;
+               }
+
+               ret = rte_event_eth_rx_adapter_start(i);
+               if (ret)
+                       rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
+                                       cdata.rx_adapter_id);
+       }
+
+       if (adptr_services->nb_rx_adptrs) {
+               struct rte_service_spec service;
+
+               memset(&service, 0, sizeof(struct rte_service_spec));
+               snprintf(service.name, sizeof(service.name), "rx_service");
+               service.callback = service_rx_adapter;
+               service.callback_userdata = (void *)adptr_services;
+
+               int32_t ret = rte_service_component_register(&service,
+                               &fdata->rxadptr_service_id);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                               "Rx adapter[%d] service register failed",
+                               cdata.rx_adapter_id);
+
+               rte_service_runstate_set(fdata->rxadptr_service_id, 1);
+               rte_service_component_runstate_set(fdata->rxadptr_service_id,
+                               1);
+               rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id,
+                               0);
+       } else {
+               memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
+               rte_free(adptr_services);
+       }
+
+       if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
+                       (dev_info.event_dev_cap &
+                        RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
+               fdata->cap.scheduler = NULL;
+
+       if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+               memset(fdata->sched_core, 0,
+                               sizeof(unsigned int) * MAX_NUM_CORE);
+}
+
+static void
+worker_tx_opt_check(void)
+{
+       int i;
+       int ret;
+       uint32_t cap = 0;
+       uint8_t rx_needed = 0;
+       struct rte_event_dev_info eventdev_info;
+
+       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+       rte_event_dev_info_get(0, &eventdev_info);
+
+       if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
+                               RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
+               rte_exit(EXIT_FAILURE,
+                               "Event dev doesn't support all type queues\n");
+
+       for (i = 0; i < rte_eth_dev_count(); i++) {
+               ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "failed to get event rx adapter "
+                                       "capabilities");
+               rx_needed |=
+                       !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+       }
+
+       if (cdata.worker_lcore_mask == 0 ||
+                       (rx_needed && cdata.rx_lcore_mask == 0) ||
+                       (cdata.sched_lcore_mask == 0 &&
+                        !(eventdev_info.event_dev_cap &
+                                RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+               printf("Core part of pipeline was not assigned any cores. "
+                       "This will stall the pipeline, please check core masks "
+                       "(use -h for details on setting core masks):\n"
+                       "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
+                       "\n\tworkers: %"PRIu64"\n",
+                       cdata.rx_lcore_mask, cdata.tx_lcore_mask,
+                       cdata.sched_lcore_mask,
+                       cdata.worker_lcore_mask);
+               rte_exit(-1, "Fix core masks\n");
+       }
+}
+
+static worker_loop
+get_worker_loop_single_burst(uint8_t atq)
+{
+       if (atq)
+               return worker_do_tx_single_burst_atq;
+
+       return worker_do_tx_single_burst;
+}
+
+static worker_loop
+get_worker_loop_single_non_burst(uint8_t atq)
+{
+       if (atq)
+               return worker_do_tx_single_atq;
+
+       return worker_do_tx_single;
+}
+
+static worker_loop
+get_worker_loop_burst(uint8_t atq)
+{
+       if (atq)
+               return worker_do_tx_burst_atq;
+
+       return worker_do_tx_burst;
+}
+
+static worker_loop
+get_worker_loop_non_burst(uint8_t atq)
+{
+       if (atq)
+               return worker_do_tx_atq;
+
+       return worker_do_tx;
+}
+
+static worker_loop
+get_worker_single_stage(bool burst)
+{
+       uint8_t atq = cdata.all_type_queues ? 1 : 0;
+
+       if (burst)
+               return get_worker_loop_single_burst(atq);
+
+       return get_worker_loop_single_non_burst(atq);
+}
+
+static worker_loop
+get_worker_multi_stage(bool burst)
+{
+       uint8_t atq = cdata.all_type_queues ? 1 : 0;
+
+       if (burst)
+               return get_worker_loop_burst(atq);
+
+       return get_worker_loop_non_burst(atq);
+}
+
+void
+set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+{
+       if (cdata.num_stages == 1)
+               caps->worker = get_worker_single_stage(burst);
+       else
+               caps->worker = get_worker_multi_stage(burst);
+
+       memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
+
+       caps->check_opt = worker_tx_opt_check;
+       caps->consumer = NULL;
+       caps->scheduler = schedule_devices;
+       caps->evdev_setup = setup_eventdev_worker_tx;
+       caps->adptr_setup = init_rx_adapter;
+}
diff --git a/examples/eventdev_pipeline_sw_pmd/Makefile b/examples/eventdev_pipeline_sw_pmd/Makefile
deleted file mode 100644 (file)
index c099adf..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-# SPDX-License-Identifier: BSD-3-Clause
-# Copyright(c) 2016-2017 Intel Corporation
-
-ifeq ($(RTE_SDK),)
-$(error "Please define RTE_SDK environment variable")
-endif
-
-# Default target, can be overridden by command line or environment
-RTE_TARGET ?= x86_64-native-linuxapp-gcc
-
-include $(RTE_SDK)/mk/rte.vars.mk
-
-# binary name
-APP = eventdev_pipeline_sw_pmd
-
-# all source are stored in SRCS-y
-SRCS-y := main.c
-SRCS-y += pipeline_worker_generic.c
-SRCS-y += pipeline_worker_tx.c
-
-CFLAGS += -O3
-CFLAGS += $(WERROR_FLAGS)
-
-include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c
deleted file mode 100644 (file)
index 2422c18..0000000
+++ /dev/null
@@ -1,574 +0,0 @@
-/* SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2016-2017 Intel Corporation
- */
-
-#include <getopt.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <signal.h>
-#include <sched.h>
-
-#include "pipeline_common.h"
-
-struct config_data cdata = {
-       .num_packets = (1L << 25), /* do ~32M packets */
-       .num_fids = 512,
-       .queue_type = RTE_SCHED_TYPE_ATOMIC,
-       .next_qid = {-1},
-       .qid = {-1},
-       .num_stages = 1,
-       .worker_cq_depth = 16
-};
-
-static bool
-core_in_use(unsigned int lcore_id) {
-       return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
-               fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
-}
-
-static void
-eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
-                       void *userdata)
-{
-       int port_id = (uintptr_t) userdata;
-       unsigned int _sent = 0;
-
-       do {
-               /* Note: hard-coded TX queue */
-               _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
-                                         unsent - _sent);
-       } while (_sent != unsent);
-}
-
-/*
- * Parse the coremask given as argument (hexadecimal string) and fill
- * the global configuration (core role and core count) with the parsed
- * value.
- */
-static int xdigit2val(unsigned char c)
-{
-       int val;
-
-       if (isdigit(c))
-               val = c - '0';
-       else if (isupper(c))
-               val = c - 'A' + 10;
-       else
-               val = c - 'a' + 10;
-       return val;
-}
-
-static uint64_t
-parse_coremask(const char *coremask)
-{
-       int i, j, idx = 0;
-       unsigned int count = 0;
-       char c;
-       int val;
-       uint64_t mask = 0;
-       const int32_t BITS_HEX = 4;
-
-       if (coremask == NULL)
-               return -1;
-       /* Remove all blank characters ahead and after .
-        * Remove 0x/0X if exists.
-        */
-       while (isblank(*coremask))
-               coremask++;
-       if (coremask[0] == '0' && ((coremask[1] == 'x')
-               || (coremask[1] == 'X')))
-               coremask += 2;
-       i = strlen(coremask);
-       while ((i > 0) && isblank(coremask[i - 1]))
-               i--;
-       if (i == 0)
-               return -1;
-
-       for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
-               c = coremask[i];
-               if (isxdigit(c) == 0) {
-                       /* invalid characters */
-                       return -1;
-               }
-               val = xdigit2val(c);
-               for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
-                       if ((1 << j) & val) {
-                               mask |= (1UL << idx);
-                               count++;
-                       }
-               }
-       }
-       for (; i >= 0; i--)
-               if (coremask[i] != '0')
-                       return -1;
-       if (count == 0)
-               return -1;
-       return mask;
-}
-
-static struct option long_options[] = {
-       {"workers", required_argument, 0, 'w'},
-       {"packets", required_argument, 0, 'n'},
-       {"atomic-flows", required_argument, 0, 'f'},
-       {"num_stages", required_argument, 0, 's'},
-       {"rx-mask", required_argument, 0, 'r'},
-       {"tx-mask", required_argument, 0, 't'},
-       {"sched-mask", required_argument, 0, 'e'},
-       {"cq-depth", required_argument, 0, 'c'},
-       {"work-cycles", required_argument, 0, 'W'},
-       {"mempool-size", required_argument, 0, 'm'},
-       {"queue-priority", no_argument, 0, 'P'},
-       {"parallel", no_argument, 0, 'p'},
-       {"ordered", no_argument, 0, 'o'},
-       {"quiet", no_argument, 0, 'q'},
-       {"use-atq", no_argument, 0, 'a'},
-       {"dump", no_argument, 0, 'D'},
-       {0, 0, 0, 0}
-};
-
-static void
-usage(void)
-{
-       const char *usage_str =
-               "  Usage: eventdev_demo [options]\n"
-               "  Options:\n"
-               "  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
-               "  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
-               "  -s, --num_stages=N           Use N atomic stages (default 1)\n"
-               "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
-               "  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
-               "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
-               "  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
-               "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
-               "  -W  --work-cycles=N          Worker cycles (default 0)\n"
-               "  -P  --queue-priority         Enable scheduler queue prioritization\n"
-               "  -o, --ordered                Use ordered scheduling\n"
-               "  -p, --parallel               Use parallel scheduling\n"
-               "  -q, --quiet                  Minimize printed output\n"
-               "  -a, --use-atq                Use all type queues\n"
-               "  -m, --mempool-size=N         Dictate the mempool size\n"
-               "  -D, --dump                   Print detailed statistics before exit"
-               "\n";
-       fprintf(stderr, "%s", usage_str);
-       exit(1);
-}
-
-static void
-parse_app_args(int argc, char **argv)
-{
-       /* Parse cli options*/
-       int option_index;
-       int c;
-       opterr = 0;
-       uint64_t rx_lcore_mask = 0;
-       uint64_t tx_lcore_mask = 0;
-       uint64_t sched_lcore_mask = 0;
-       uint64_t worker_lcore_mask = 0;
-       int i;
-
-       for (;;) {
-               c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
-                               long_options, &option_index);
-               if (c == -1)
-                       break;
-
-               int popcnt = 0;
-               switch (c) {
-               case 'n':
-                       cdata.num_packets = (int64_t)atol(optarg);
-                       if (cdata.num_packets == 0)
-                               cdata.num_packets = INT64_MAX;
-                       break;
-               case 'f':
-                       cdata.num_fids = (unsigned int)atoi(optarg);
-                       break;
-               case 's':
-                       cdata.num_stages = (unsigned int)atoi(optarg);
-                       break;
-               case 'c':
-                       cdata.worker_cq_depth = (unsigned int)atoi(optarg);
-                       break;
-               case 'W':
-                       cdata.worker_cycles = (unsigned int)atoi(optarg);
-                       break;
-               case 'P':
-                       cdata.enable_queue_priorities = 1;
-                       break;
-               case 'o':
-                       cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
-                       break;
-               case 'p':
-                       cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
-                       break;
-               case 'a':
-                       cdata.all_type_queues = 1;
-                       break;
-               case 'q':
-                       cdata.quiet = 1;
-                       break;
-               case 'D':
-                       cdata.dump_dev = 1;
-                       break;
-               case 'w':
-                       worker_lcore_mask = parse_coremask(optarg);
-                       break;
-               case 'r':
-                       rx_lcore_mask = parse_coremask(optarg);
-                       popcnt = __builtin_popcountll(rx_lcore_mask);
-                       fdata->rx_single = (popcnt == 1);
-                       break;
-               case 't':
-                       tx_lcore_mask = parse_coremask(optarg);
-                       popcnt = __builtin_popcountll(tx_lcore_mask);
-                       fdata->tx_single = (popcnt == 1);
-                       break;
-               case 'e':
-                       sched_lcore_mask = parse_coremask(optarg);
-                       popcnt = __builtin_popcountll(sched_lcore_mask);
-                       fdata->sched_single = (popcnt == 1);
-                       break;
-               case 'm':
-                       cdata.num_mbuf = (uint64_t)atol(optarg);
-                       break;
-               default:
-                       usage();
-               }
-       }
-
-       cdata.worker_lcore_mask = worker_lcore_mask;
-       cdata.sched_lcore_mask = sched_lcore_mask;
-       cdata.rx_lcore_mask = rx_lcore_mask;
-       cdata.tx_lcore_mask = tx_lcore_mask;
-
-       if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
-               usage();
-
-       for (i = 0; i < MAX_NUM_CORE; i++) {
-               fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
-               fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
-               fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
-               fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
-
-               if (fdata->worker_core[i])
-                       cdata.num_workers++;
-               if (core_in_use(i))
-                       cdata.active_cores++;
-       }
-}
-
-/*
- * Initializes a given port using global settings and with the RX buffers
- * coming from the mbuf_pool passed as a parameter.
- */
-static inline int
-port_init(uint8_t port, struct rte_mempool *mbuf_pool)
-{
-       static const struct rte_eth_conf port_conf_default = {
-               .rxmode = {
-                       .mq_mode = ETH_MQ_RX_RSS,
-                       .max_rx_pkt_len = ETHER_MAX_LEN,
-                       .ignore_offload_bitfield = 1,
-               },
-               .rx_adv_conf = {
-                       .rss_conf = {
-                               .rss_hf = ETH_RSS_IP |
-                                         ETH_RSS_TCP |
-                                         ETH_RSS_UDP,
-                       }
-               }
-       };
-       const uint16_t rx_rings = 1, tx_rings = 1;
-       const uint16_t rx_ring_size = 512, tx_ring_size = 512;
-       struct rte_eth_conf port_conf = port_conf_default;
-       int retval;
-       uint16_t q;
-       struct rte_eth_dev_info dev_info;
-       struct rte_eth_txconf txconf;
-
-       if (port >= rte_eth_dev_count())
-               return -1;
-
-       rte_eth_dev_info_get(port, &dev_info);
-       if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
-               port_conf.txmode.offloads |=
-                       DEV_TX_OFFLOAD_MBUF_FAST_FREE;
-
-       /* Configure the Ethernet device. */
-       retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
-       if (retval != 0)
-               return retval;
-
-       /* Allocate and set up 1 RX queue per Ethernet port. */
-       for (q = 0; q < rx_rings; q++) {
-               retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
-                               rte_eth_dev_socket_id(port), NULL, mbuf_pool);
-               if (retval < 0)
-                       return retval;
-       }
-
-       txconf = dev_info.default_txconf;
-       txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE;
-       txconf.offloads = port_conf_default.txmode.offloads;
-       /* Allocate and set up 1 TX queue per Ethernet port. */
-       for (q = 0; q < tx_rings; q++) {
-               retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
-                               rte_eth_dev_socket_id(port), &txconf);
-               if (retval < 0)
-                       return retval;
-       }
-
-       /* Start the Ethernet port. */
-       retval = rte_eth_dev_start(port);
-       if (retval < 0)
-               return retval;
-
-       /* Display the port MAC address. */
-       struct ether_addr addr;
-       rte_eth_macaddr_get(port, &addr);
-       printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
-                          " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
-                       (unsigned int)port,
-                       addr.addr_bytes[0], addr.addr_bytes[1],
-                       addr.addr_bytes[2], addr.addr_bytes[3],
-                       addr.addr_bytes[4], addr.addr_bytes[5]);
-
-       /* Enable RX in promiscuous mode for the Ethernet device. */
-       rte_eth_promiscuous_enable(port);
-
-       return 0;
-}
-
-static int
-init_ports(unsigned int num_ports)
-{
-       uint8_t portid;
-       unsigned int i;
-
-       if (!cdata.num_mbuf)
-               cdata.num_mbuf = 16384 * num_ports;
-
-       struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
-                       /* mbufs */ cdata.num_mbuf,
-                       /* cache_size */ 512,
-                       /* priv_size*/ 0,
-                       /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
-                       rte_socket_id());
-
-       for (portid = 0; portid < num_ports; portid++)
-               if (port_init(portid, mp) != 0)
-                       rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
-                                       portid);
-
-       for (i = 0; i < num_ports; i++) {
-               void *userdata = (void *)(uintptr_t) i;
-               fdata->tx_buf[i] =
-                       rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
-               if (fdata->tx_buf[i] == NULL)
-                       rte_panic("Out of memory\n");
-               rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
-               rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
-                                                  eth_tx_buffer_retry,
-                                                  userdata);
-       }
-
-       return 0;
-}
-
-static void
-do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id)
-{
-       int i;
-       uint8_t mt_unsafe = 0;
-       uint8_t burst = 0;
-
-       for (i = 0; i < nb_ethdev; i++) {
-               struct rte_eth_dev_info dev_info;
-               memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
-
-               rte_eth_dev_info_get(i, &dev_info);
-               /* Check if it is safe ask worker to tx. */
-               mt_unsafe |= !(dev_info.tx_offload_capa &
-                               DEV_TX_OFFLOAD_MT_LOCKFREE);
-       }
-
-       struct rte_event_dev_info eventdev_info;
-       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
-
-       rte_event_dev_info_get(eventdev_id, &eventdev_info);
-       burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
-               0;
-
-       if (mt_unsafe)
-               set_worker_generic_setup_data(&fdata->cap, burst);
-       else
-               set_worker_tx_setup_data(&fdata->cap, burst);
-}
-
-static void
-signal_handler(int signum)
-{
-       if (fdata->done)
-               rte_exit(1, "Exiting on signal %d\n", signum);
-       if (signum == SIGINT || signum == SIGTERM) {
-               printf("\n\nSignal %d received, preparing to exit...\n",
-                               signum);
-               fdata->done = 1;
-       }
-       if (signum == SIGTSTP)
-               rte_event_dev_dump(0, stdout);
-}
-
-static inline uint64_t
-port_stat(int dev_id, int32_t p)
-{
-       char statname[64];
-       snprintf(statname, sizeof(statname), "port_%u_rx", p);
-       return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
-}
-
-int
-main(int argc, char **argv)
-{
-       struct worker_data *worker_data;
-       unsigned int num_ports;
-       int lcore_id;
-       int err;
-
-       signal(SIGINT, signal_handler);
-       signal(SIGTERM, signal_handler);
-       signal(SIGTSTP, signal_handler);
-
-       err = rte_eal_init(argc, argv);
-       if (err < 0)
-               rte_panic("Invalid EAL arguments\n");
-
-       argc -= err;
-       argv += err;
-
-       fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
-       if (fdata == NULL)
-               rte_panic("Out of memory\n");
-
-       /* Parse cli options*/
-       parse_app_args(argc, argv);
-
-       num_ports = rte_eth_dev_count();
-       if (num_ports == 0)
-               rte_panic("No ethernet ports found\n");
-
-       const unsigned int cores_needed = cdata.active_cores;
-
-       if (!cdata.quiet) {
-               printf("  Config:\n");
-               printf("\tports: %u\n", num_ports);
-               printf("\tworkers: %u\n", cdata.num_workers);
-               printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
-               printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
-               if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
-                       printf("\tqid0 type: ordered\n");
-               if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
-                       printf("\tqid0 type: atomic\n");
-               printf("\tCores available: %u\n", rte_lcore_count());
-               printf("\tCores used: %u\n", cores_needed);
-       }
-
-       if (rte_lcore_count() < cores_needed)
-               rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
-                               cores_needed);
-
-       const unsigned int ndevs = rte_event_dev_count();
-       if (ndevs == 0)
-               rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
-       if (ndevs > 1)
-               fprintf(stderr, "Warning: More than one eventdev, using idx 0");
-
-
-       do_capability_setup(num_ports, 0);
-       fdata->cap.check_opt();
-
-       worker_data = rte_calloc(0, cdata.num_workers,
-                       sizeof(worker_data[0]), 0);
-       if (worker_data == NULL)
-               rte_panic("rte_calloc failed\n");
-
-       int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
-       if (dev_id < 0)
-               rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
-
-       init_ports(num_ports);
-       fdata->cap.adptr_setup(num_ports);
-
-       int worker_idx = 0;
-       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-               if (lcore_id >= MAX_NUM_CORE)
-                       break;
-
-               if (!fdata->rx_core[lcore_id] &&
-                       !fdata->worker_core[lcore_id] &&
-                       !fdata->tx_core[lcore_id] &&
-                       !fdata->sched_core[lcore_id])
-                       continue;
-
-               if (fdata->rx_core[lcore_id])
-                       printf(
-                               "[%s()] lcore %d executing NIC Rx\n",
-                               __func__, lcore_id);
-
-               if (fdata->tx_core[lcore_id])
-                       printf(
-                               "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
-                               __func__, lcore_id, cons_data.port_id);
-
-               if (fdata->sched_core[lcore_id])
-                       printf("[%s()] lcore %d executing scheduler\n",
-                                       __func__, lcore_id);
-
-               if (fdata->worker_core[lcore_id])
-                       printf(
-                               "[%s()] lcore %d executing worker, using eventdev port %u\n",
-                               __func__, lcore_id,
-                               worker_data[worker_idx].port_id);
-
-               err = rte_eal_remote_launch(fdata->cap.worker,
-                               &worker_data[worker_idx], lcore_id);
-               if (err) {
-                       rte_panic("Failed to launch worker on core %d\n",
-                                       lcore_id);
-                       continue;
-               }
-               if (fdata->worker_core[lcore_id])
-                       worker_idx++;
-       }
-
-       lcore_id = rte_lcore_id();
-
-       if (core_in_use(lcore_id))
-               fdata->cap.worker(&worker_data[worker_idx++]);
-
-       rte_eal_mp_wait_lcore();
-
-       if (cdata.dump_dev)
-               rte_event_dev_dump(dev_id, stdout);
-
-       if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
-                       (uint64_t)-ENOTSUP)) {
-               printf("\nPort Workload distribution:\n");
-               uint32_t i;
-               uint64_t tot_pkts = 0;
-               uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
-               for (i = 0; i < cdata.num_workers; i++) {
-                       pkts_per_wkr[i] =
-                               port_stat(dev_id, worker_data[i].port_id);
-                       tot_pkts += pkts_per_wkr[i];
-               }
-               for (i = 0; i < cdata.num_workers; i++) {
-                       float pc = pkts_per_wkr[i]  * 100 /
-                               ((float)tot_pkts);
-                       printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
-                                       i, pc, pkts_per_wkr[i]);
-               }
-
-       }
-
-       return 0;
-}
diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h
deleted file mode 100644 (file)
index 9703396..0000000
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-3-Clause
- * Copyright 2016 Intel Corporation.
- * Copyright 2017 Cavium, Inc.
- */
-
-#include <stdbool.h>
-
-#include <rte_eal.h>
-#include <rte_mempool.h>
-#include <rte_mbuf.h>
-#include <rte_launch.h>
-#include <rte_malloc.h>
-#include <rte_random.h>
-#include <rte_cycles.h>
-#include <rte_ethdev.h>
-#include <rte_eventdev.h>
-#include <rte_event_eth_rx_adapter.h>
-#include <rte_service.h>
-#include <rte_service_component.h>
-
-#define MAX_NUM_STAGES 8
-#define BATCH_SIZE 16
-#define MAX_NUM_CORE 64
-
-struct cons_data {
-       uint8_t dev_id;
-       uint8_t port_id;
-       uint8_t release;
-} __rte_cache_aligned;
-
-struct worker_data {
-       uint8_t dev_id;
-       uint8_t port_id;
-} __rte_cache_aligned;
-
-typedef int (*worker_loop)(void *);
-typedef int (*consumer_loop)(void);
-typedef void (*schedule_loop)(unsigned int);
-typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
-typedef void (*rx_adapter_setup)(uint16_t nb_ports);
-typedef void (*opt_check)(void);
-
-struct setup_data {
-       worker_loop worker;
-       consumer_loop consumer;
-       schedule_loop scheduler;
-       eventdev_setup evdev_setup;
-       rx_adapter_setup adptr_setup;
-       opt_check check_opt;
-};
-
-struct fastpath_data {
-       volatile int done;
-       uint32_t tx_lock;
-       uint32_t evdev_service_id;
-       uint32_t rxadptr_service_id;
-       bool rx_single;
-       bool tx_single;
-       bool sched_single;
-       unsigned int rx_core[MAX_NUM_CORE];
-       unsigned int tx_core[MAX_NUM_CORE];
-       unsigned int sched_core[MAX_NUM_CORE];
-       unsigned int worker_core[MAX_NUM_CORE];
-       struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
-       struct setup_data cap;
-} __rte_cache_aligned;
-
-struct config_data {
-       unsigned int active_cores;
-       unsigned int num_workers;
-       int64_t num_packets;
-       uint64_t num_mbuf;
-       unsigned int num_fids;
-       int queue_type;
-       int worker_cycles;
-       int enable_queue_priorities;
-       int quiet;
-       int dump_dev;
-       int dump_dev_signal;
-       int all_type_queues;
-       unsigned int num_stages;
-       unsigned int worker_cq_depth;
-       unsigned int rx_stride;
-       /* Use rx stride value to reduce congestion in entry queue when using
-        * multiple eth ports by forming multiple event queue pipelines.
-        */
-       int16_t next_qid[MAX_NUM_STAGES+2];
-       int16_t qid[MAX_NUM_STAGES];
-       uint8_t rx_adapter_id;
-       uint64_t worker_lcore_mask;
-       uint64_t rx_lcore_mask;
-       uint64_t tx_lcore_mask;
-       uint64_t sched_lcore_mask;
-};
-
-struct port_link {
-       uint8_t queue_id;
-       uint8_t priority;
-};
-
-struct cons_data cons_data;
-
-struct fastpath_data *fdata;
-struct config_data cdata;
-
-static __rte_always_inline void
-exchange_mac(struct rte_mbuf *m)
-{
-       struct ether_hdr *eth;
-       struct ether_addr addr;
-
-       /* change mac addresses on packet (to use mbuf data) */
-       eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
-       ether_addr_copy(&eth->d_addr, &addr);
-       ether_addr_copy(&addr, &eth->d_addr);
-}
-
-static __rte_always_inline void
-work(void)
-{
-       /* do a number of cycles of work per packet */
-       volatile uint64_t start_tsc = rte_rdtsc();
-       while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
-               rte_pause();
-}
-
-static __rte_always_inline void
-schedule_devices(unsigned int lcore_id)
-{
-       if (fdata->rx_core[lcore_id]) {
-               rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
-                               !fdata->rx_single);
-       }
-
-       if (fdata->sched_core[lcore_id]) {
-               rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
-                               !fdata->sched_single);
-               if (cdata.dump_dev_signal) {
-                       rte_event_dev_dump(0, stdout);
-                       cdata.dump_dev_signal = 0;
-               }
-       }
-
-       if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
-                        rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
-               fdata->cap.consumer();
-               rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
-       }
-}
-
-void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
-void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
deleted file mode 100644 (file)
index 2c51f4a..0000000
+++ /dev/null
@@ -1,569 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-3-Clause
- * Copyright 2016 Intel Corporation.
- * Copyright 2017 Cavium, Inc.
- */
-
-#include "pipeline_common.h"
-
-static __rte_always_inline int
-worker_generic(void *arg)
-{
-       struct rte_event ev;
-
-       struct worker_data *data = (struct worker_data *)arg;
-       uint8_t dev_id = data->dev_id;
-       uint8_t port_id = data->port_id;
-       size_t sent = 0, received = 0;
-       unsigned int lcore_id = rte_lcore_id();
-
-       while (!fdata->done) {
-
-               if (fdata->cap.scheduler)
-                       fdata->cap.scheduler(lcore_id);
-
-               if (!fdata->worker_core[lcore_id]) {
-                       rte_pause();
-                       continue;
-               }
-
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
-                               &ev, 1, 0);
-
-               if (nb_rx == 0) {
-                       rte_pause();
-                       continue;
-               }
-               received++;
-
-               /* The first worker stage does classification */
-               if (ev.queue_id == cdata.qid[0])
-                       ev.flow_id = ev.mbuf->hash.rss
-                                               % cdata.num_fids;
-
-               ev.queue_id = cdata.next_qid[ev.queue_id];
-               ev.op = RTE_EVENT_OP_FORWARD;
-               ev.sched_type = cdata.queue_type;
-
-               work();
-
-               while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
-                       rte_pause();
-               sent++;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu TX=%zu\n",
-                               rte_lcore_id(), received, sent);
-
-       return 0;
-}
-
-static int
-worker_generic_burst(void *arg)
-{
-       struct rte_event events[BATCH_SIZE];
-
-       struct worker_data *data = (struct worker_data *)arg;
-       uint8_t dev_id = data->dev_id;
-       uint8_t port_id = data->port_id;
-       size_t sent = 0, received = 0;
-       unsigned int lcore_id = rte_lcore_id();
-
-       while (!fdata->done) {
-               uint16_t i;
-
-               if (fdata->cap.scheduler)
-                       fdata->cap.scheduler(lcore_id);
-
-               if (!fdata->worker_core[lcore_id]) {
-                       rte_pause();
-                       continue;
-               }
-
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
-                               events, RTE_DIM(events), 0);
-
-               if (nb_rx == 0) {
-                       rte_pause();
-                       continue;
-               }
-               received += nb_rx;
-
-               for (i = 0; i < nb_rx; i++) {
-
-                       /* The first worker stage does classification */
-                       if (events[i].queue_id == cdata.qid[0])
-                               events[i].flow_id = events[i].mbuf->hash.rss
-                                                       % cdata.num_fids;
-
-                       events[i].queue_id = cdata.next_qid[events[i].queue_id];
-                       events[i].op = RTE_EVENT_OP_FORWARD;
-                       events[i].sched_type = cdata.queue_type;
-
-                       work();
-               }
-               uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-                               events, nb_rx);
-               while (nb_tx < nb_rx && !fdata->done)
-                       nb_tx += rte_event_enqueue_burst(dev_id, port_id,
-                                                       events + nb_tx,
-                                                       nb_rx - nb_tx);
-               sent += nb_tx;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu TX=%zu\n",
-                               rte_lcore_id(), received, sent);
-
-       return 0;
-}
-
-static __rte_always_inline int
-consumer(void)
-{
-       const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-       struct rte_event packet;
-
-       static uint64_t received;
-       static uint64_t last_pkts;
-       static uint64_t last_time;
-       static uint64_t start_time;
-       int i;
-       uint8_t dev_id = cons_data.dev_id;
-       uint8_t port_id = cons_data.port_id;
-
-       do {
-               uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-                               &packet, 1, 0);
-
-               if (n == 0) {
-                       for (i = 0; i < rte_eth_dev_count(); i++)
-                               rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
-                       return 0;
-               }
-               if (start_time == 0)
-                       last_time = start_time = rte_get_timer_cycles();
-
-               received++;
-               uint8_t outport = packet.mbuf->port;
-
-               exchange_mac(packet.mbuf);
-               rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-                               packet.mbuf);
-
-               if (cons_data.release)
-                       rte_event_enqueue_burst(dev_id, port_id,
-                                                               &packet, n);
-
-               /* Print out mpps every 1<22 packets */
-               if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-                       const uint64_t now = rte_get_timer_cycles();
-                       const uint64_t total_ms = (now - start_time) / freq_khz;
-                       const uint64_t delta_ms = (now - last_time) / freq_khz;
-                       uint64_t delta_pkts = received - last_pkts;
-
-                       printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
-                                       "avg %.3f mpps [current %.3f mpps]\n",
-                                       __func__,
-                                       received,
-                                       total_ms,
-                                       received / (total_ms * 1000.0),
-                                       delta_pkts / (delta_ms * 1000.0));
-                       last_pkts = received;
-                       last_time = now;
-               }
-
-               cdata.num_packets--;
-               if (cdata.num_packets <= 0)
-                       fdata->done = 1;
-       /* Be stuck in this loop if single. */
-       } while (!fdata->done && fdata->tx_single);
-
-       return 0;
-}
-
-static __rte_always_inline int
-consumer_burst(void)
-{
-       const uint64_t freq_khz = rte_get_timer_hz() / 1000;
-       struct rte_event packets[BATCH_SIZE];
-
-       static uint64_t received;
-       static uint64_t last_pkts;
-       static uint64_t last_time;
-       static uint64_t start_time;
-       unsigned int i, j;
-       uint8_t dev_id = cons_data.dev_id;
-       uint8_t port_id = cons_data.port_id;
-       uint16_t nb_ports = rte_eth_dev_count();
-
-       do {
-               uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
-                               packets, RTE_DIM(packets), 0);
-
-               if (n == 0) {
-                       for (j = 0; j < nb_ports; j++)
-                               rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
-                       return 0;
-               }
-               if (start_time == 0)
-                       last_time = start_time = rte_get_timer_cycles();
-
-               received += n;
-               for (i = 0; i < n; i++) {
-                       uint8_t outport = packets[i].mbuf->port;
-
-                       exchange_mac(packets[i].mbuf);
-                       rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
-                                       packets[i].mbuf);
-
-                       packets[i].op = RTE_EVENT_OP_RELEASE;
-               }
-
-               if (cons_data.release) {
-                       uint16_t nb_tx;
-
-                       nb_tx = rte_event_enqueue_burst(dev_id, port_id,
-                                                               packets, n);
-                       while (nb_tx < n)
-                               nb_tx += rte_event_enqueue_burst(dev_id,
-                                               port_id, packets + nb_tx,
-                                               n - nb_tx);
-               }
-
-               /* Print out mpps every 1<22 packets */
-               if (!cdata.quiet && received >= last_pkts + (1<<22)) {
-                       const uint64_t now = rte_get_timer_cycles();
-                       const uint64_t total_ms = (now - start_time) / freq_khz;
-                       const uint64_t delta_ms = (now - last_time) / freq_khz;
-                       uint64_t delta_pkts = received - last_pkts;
-
-                       printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
-                                       "avg %.3f mpps [current %.3f mpps]\n",
-                                       received,
-                                       total_ms,
-                                       received / (total_ms * 1000.0),
-                                       delta_pkts / (delta_ms * 1000.0));
-                       last_pkts = received;
-                       last_time = now;
-               }
-
-               cdata.num_packets -= n;
-               if (cdata.num_packets <= 0)
-                       fdata->done = 1;
-       /* Be stuck in this loop if single. */
-       } while (!fdata->done && fdata->tx_single);
-
-       return 0;
-}
-
-static int
-setup_eventdev_generic(struct cons_data *cons_data,
-               struct worker_data *worker_data)
-{
-       const uint8_t dev_id = 0;
-       /* +1 stages is for a SINGLE_LINK TX stage */
-       const uint8_t nb_queues = cdata.num_stages + 1;
-       /* + 1 is one port for consumer */
-       const uint8_t nb_ports = cdata.num_workers + 1;
-       struct rte_event_dev_config config = {
-                       .nb_event_queues = nb_queues,
-                       .nb_event_ports = nb_ports,
-                       .nb_events_limit  = 4096,
-                       .nb_event_queue_flows = 1024,
-                       .nb_event_port_dequeue_depth = 128,
-                       .nb_event_port_enqueue_depth = 128,
-       };
-       struct rte_event_port_conf wkr_p_conf = {
-                       .dequeue_depth = cdata.worker_cq_depth,
-                       .enqueue_depth = 64,
-                       .new_event_threshold = 4096,
-       };
-       struct rte_event_queue_conf wkr_q_conf = {
-                       .schedule_type = cdata.queue_type,
-                       .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
-                       .nb_atomic_flows = 1024,
-               .nb_atomic_order_sequences = 1024,
-       };
-       struct rte_event_port_conf tx_p_conf = {
-                       .dequeue_depth = 128,
-                       .enqueue_depth = 128,
-                       .new_event_threshold = 4096,
-       };
-       struct rte_event_queue_conf tx_q_conf = {
-                       .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
-                       .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
-       };
-
-       struct port_link worker_queues[MAX_NUM_STAGES];
-       uint8_t disable_implicit_release;
-       struct port_link tx_queue;
-       unsigned int i;
-
-       int ret, ndev = rte_event_dev_count();
-       if (ndev < 1) {
-               printf("%d: No Eventdev Devices Found\n", __LINE__);
-               return -1;
-       }
-
-       struct rte_event_dev_info dev_info;
-       ret = rte_event_dev_info_get(dev_id, &dev_info);
-       printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
-
-       disable_implicit_release = (dev_info.event_dev_cap &
-                       RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
-
-       wkr_p_conf.disable_implicit_release = disable_implicit_release;
-       tx_p_conf.disable_implicit_release = disable_implicit_release;
-
-       if (dev_info.max_event_port_dequeue_depth <
-                       config.nb_event_port_dequeue_depth)
-               config.nb_event_port_dequeue_depth =
-                               dev_info.max_event_port_dequeue_depth;
-       if (dev_info.max_event_port_enqueue_depth <
-                       config.nb_event_port_enqueue_depth)
-               config.nb_event_port_enqueue_depth =
-                               dev_info.max_event_port_enqueue_depth;
-
-       ret = rte_event_dev_configure(dev_id, &config);
-       if (ret < 0) {
-               printf("%d: Error configuring device\n", __LINE__);
-               return -1;
-       }
-
-       /* Q creation - one load balanced per pipeline stage*/
-       printf("  Stages:\n");
-       for (i = 0; i < cdata.num_stages; i++) {
-               if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
-                       printf("%d: error creating qid %d\n", __LINE__, i);
-                       return -1;
-               }
-               cdata.qid[i] = i;
-               cdata.next_qid[i] = i+1;
-               worker_queues[i].queue_id = i;
-               if (cdata.enable_queue_priorities) {
-                       /* calculate priority stepping for each stage, leaving
-                        * headroom of 1 for the SINGLE_LINK TX below
-                        */
-                       const uint32_t prio_delta =
-                               (RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
-
-                       /* higher priority for queues closer to tx */
-                       wkr_q_conf.priority =
-                               RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
-               }
-
-               const char *type_str = "Atomic";
-               switch (wkr_q_conf.schedule_type) {
-               case RTE_SCHED_TYPE_ORDERED:
-                       type_str = "Ordered";
-                       break;
-               case RTE_SCHED_TYPE_PARALLEL:
-                       type_str = "Parallel";
-                       break;
-               }
-               printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
-                               wkr_q_conf.priority);
-       }
-       printf("\n");
-
-       /* final queue for sending to TX core */
-       if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
-               printf("%d: error creating qid %d\n", __LINE__, i);
-               return -1;
-       }
-       tx_queue.queue_id = i;
-       tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
-
-       if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-               wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-       if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-               wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-       /* set up one port per worker, linking to all stage queues */
-       for (i = 0; i < cdata.num_workers; i++) {
-               struct worker_data *w = &worker_data[i];
-               w->dev_id = dev_id;
-               if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
-                       printf("Error setting up port %d\n", i);
-                       return -1;
-               }
-
-               uint32_t s;
-               for (s = 0; s < cdata.num_stages; s++) {
-                       if (rte_event_port_link(dev_id, i,
-                                               &worker_queues[s].queue_id,
-                                               &worker_queues[s].priority,
-                                               1) != 1) {
-                               printf("%d: error creating link for port %d\n",
-                                               __LINE__, i);
-                               return -1;
-                       }
-               }
-               w->port_id = i;
-       }
-
-       if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-               tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-       if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-               tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-       /* port for consumer, linked to TX queue */
-       if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
-               printf("Error setting up port %d\n", i);
-               return -1;
-       }
-       if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
-                               &tx_queue.priority, 1) != 1) {
-               printf("%d: error creating link for port %d\n",
-                               __LINE__, i);
-               return -1;
-       }
-       *cons_data = (struct cons_data){.dev_id = dev_id,
-                                       .port_id = i,
-                                       .release = disable_implicit_release };
-
-       ret = rte_event_dev_service_id_get(dev_id,
-                               &fdata->evdev_service_id);
-       if (ret != -ESRCH && ret != 0) {
-               printf("Error getting the service ID for sw eventdev\n");
-               return -1;
-       }
-       rte_service_runstate_set(fdata->evdev_service_id, 1);
-       rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-       if (rte_event_dev_start(dev_id) < 0) {
-               printf("Error starting eventdev\n");
-               return -1;
-       }
-
-       return dev_id;
-}
-
-static void
-init_rx_adapter(uint16_t nb_ports)
-{
-       int i;
-       int ret;
-       uint8_t evdev_id = 0;
-       struct rte_event_dev_info dev_info;
-
-       ret = rte_event_dev_info_get(evdev_id, &dev_info);
-
-       struct rte_event_port_conf rx_p_conf = {
-               .dequeue_depth = 8,
-               .enqueue_depth = 8,
-               .new_event_threshold = 1200,
-       };
-
-       if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-               rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-       if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-               rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
-       /* Create one adapter for all the ethernet ports. */
-       ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
-                       &rx_p_conf);
-       if (ret)
-               rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
-                               cdata.rx_adapter_id);
-
-       struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
-               .ev.sched_type = cdata.queue_type,
-               .ev.queue_id = cdata.qid[0],
-       };
-
-       for (i = 0; i < nb_ports; i++) {
-               uint32_t cap;
-
-               ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                                       "failed to get event rx adapter "
-                                       "capabilities");
-
-               ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
-                               -1, &queue_conf);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                                       "Failed to add queues to Rx adapter");
-       }
-
-       ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
-                               &fdata->rxadptr_service_id);
-       if (ret != -ESRCH && ret != 0) {
-               rte_exit(EXIT_FAILURE,
-                       "Error getting the service ID for sw eventdev\n");
-       }
-       rte_service_runstate_set(fdata->rxadptr_service_id, 1);
-       rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
-
-       ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
-       if (ret)
-               rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-                               cdata.rx_adapter_id);
-}
-
-static void
-generic_opt_check(void)
-{
-       int i;
-       int ret;
-       uint32_t cap = 0;
-       uint8_t rx_needed = 0;
-       struct rte_event_dev_info eventdev_info;
-
-       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
-       rte_event_dev_info_get(0, &eventdev_info);
-
-       if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
-                               RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
-               rte_exit(EXIT_FAILURE,
-                               "Event dev doesn't support all type queues\n");
-
-       for (i = 0; i < rte_eth_dev_count(); i++) {
-               ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                               "failed to get event rx adapter capabilities");
-               rx_needed |=
-                       !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
-       }
-
-       if (cdata.worker_lcore_mask == 0 ||
-                       (rx_needed && cdata.rx_lcore_mask == 0) ||
-                       cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
-                               && !(eventdev_info.event_dev_cap &
-                                       RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
-               printf("Core part of pipeline was not assigned any cores. "
-                       "This will stall the pipeline, please check core masks "
-                       "(use -h for details on setting core masks):\n"
-                       "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
-                       "\n\tworkers: %"PRIu64"\n",
-                       cdata.rx_lcore_mask, cdata.tx_lcore_mask,
-                       cdata.sched_lcore_mask,
-                       cdata.worker_lcore_mask);
-               rte_exit(-1, "Fix core masks\n");
-       }
-
-       if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
-               memset(fdata->sched_core, 0,
-                               sizeof(unsigned int) * MAX_NUM_CORE);
-}
-
-void
-set_worker_generic_setup_data(struct setup_data *caps, bool burst)
-{
-       if (burst) {
-               caps->consumer = consumer_burst;
-               caps->worker = worker_generic_burst;
-       } else {
-               caps->consumer = consumer;
-               caps->worker = worker_generic;
-       }
-
-       caps->adptr_setup = init_rx_adapter;
-       caps->scheduler = schedule_devices;
-       caps->evdev_setup = setup_eventdev_generic;
-       caps->check_opt = generic_opt_check;
-}
diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
deleted file mode 100644 (file)
index c0d1bd9..0000000
+++ /dev/null
@@ -1,838 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2010-2014 Intel Corporation
- * Copyright 2017 Cavium, Inc.
- */
-
-#include "pipeline_common.h"
-
-static __rte_always_inline void
-worker_fwd_event(struct rte_event *ev, uint8_t sched)
-{
-       ev->event_type = RTE_EVENT_TYPE_CPU;
-       ev->op = RTE_EVENT_OP_FORWARD;
-       ev->sched_type = sched;
-}
-
-static __rte_always_inline void
-worker_event_enqueue(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev)
-{
-       while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
-               rte_pause();
-}
-
-static __rte_always_inline void
-worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev, const uint16_t nb_rx)
-{
-       uint16_t enq;
-
-       enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-       while (enq < nb_rx) {
-               enq += rte_event_enqueue_burst(dev, port,
-                                               ev + enq, nb_rx - enq);
-       }
-}
-
-static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
-{
-       exchange_mac(mbuf);
-       while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
-               rte_pause();
-}
-
-/* Single stage pipeline workers */
-
-static int
-worker_do_tx_single(void *arg)
-{
-       struct worker_data *data = (struct worker_data *)arg;
-       const uint8_t dev = data->dev_id;
-       const uint8_t port = data->port_id;
-       size_t fwd = 0, received = 0, tx = 0;
-       struct rte_event ev;
-
-       while (!fdata->done) {
-
-               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
-                       rte_pause();
-                       continue;
-               }
-
-               received++;
-
-               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                       worker_tx_pkt(ev.mbuf);
-                       tx++;
-                       continue;
-               }
-               work();
-               ev.queue_id++;
-               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               worker_event_enqueue(dev, port, &ev);
-               fwd++;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-       return 0;
-}
-
-static int
-worker_do_tx_single_atq(void *arg)
-{
-       struct worker_data *data = (struct worker_data *)arg;
-       const uint8_t dev = data->dev_id;
-       const uint8_t port = data->port_id;
-       size_t fwd = 0, received = 0, tx = 0;
-       struct rte_event ev;
-
-       while (!fdata->done) {
-
-               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
-                       rte_pause();
-                       continue;
-               }
-
-               received++;
-
-               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                       worker_tx_pkt(ev.mbuf);
-                       tx++;
-                       continue;
-               }
-               work();
-               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               worker_event_enqueue(dev, port, &ev);
-               fwd++;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-       return 0;
-}
-
-static int
-worker_do_tx_single_burst(void *arg)
-{
-       struct rte_event ev[BATCH_SIZE + 1];
-
-       struct worker_data *data = (struct worker_data *)arg;
-       const uint8_t dev = data->dev_id;
-       const uint8_t port = data->port_id;
-       size_t fwd = 0, received = 0, tx = 0;
-
-       while (!fdata->done) {
-               uint16_t i;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BATCH_SIZE, 0);
-
-               if (!nb_rx) {
-                       rte_pause();
-                       continue;
-               }
-               received += nb_rx;
-
-               for (i = 0; i < nb_rx; i++) {
-                       rte_prefetch0(ev[i + 1].mbuf);
-                       if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-                               worker_tx_pkt(ev[i].mbuf);
-                               ev[i].op = RTE_EVENT_OP_RELEASE;
-                               tx++;
-
-                       } else {
-                               ev[i].queue_id++;
-                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
-                       }
-                       work();
-               }
-
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-       return 0;
-}
-
-static int
-worker_do_tx_single_burst_atq(void *arg)
-{
-       struct rte_event ev[BATCH_SIZE + 1];
-
-       struct worker_data *data = (struct worker_data *)arg;
-       const uint8_t dev = data->dev_id;
-       const uint8_t port = data->port_id;
-       size_t fwd = 0, received = 0, tx = 0;
-
-       while (!fdata->done) {
-               uint16_t i;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BATCH_SIZE, 0);
-
-               if (!nb_rx) {
-                       rte_pause();
-                       continue;
-               }
-
-               received += nb_rx;
-
-               for (i = 0; i < nb_rx; i++) {
-                       rte_prefetch0(ev[i + 1].mbuf);
-                       if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-
-                               worker_tx_pkt(ev[i].mbuf);
-                               ev[i].op = RTE_EVENT_OP_RELEASE;
-                               tx++;
-                       } else
-                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
-                       work();
-               }
-
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-       return 0;
-}
-
-/* Multi stage Pipeline Workers */
-
-static int
-worker_do_tx(void *arg)
-{
-       struct rte_event ev;
-
-       struct worker_data *data = (struct worker_data *)arg;
-       const uint8_t dev = data->dev_id;
-       const uint8_t port = data->port_id;
-       const uint8_t lst_qid = cdata.num_stages - 1;
-       size_t fwd = 0, received = 0, tx = 0;
-
-
-       while (!fdata->done) {
-
-               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
-                       rte_pause();
-                       continue;
-               }
-
-               received++;
-               const uint8_t cq_id = ev.queue_id % cdata.num_stages;
-
-               if (cq_id >= lst_qid) {
-                       if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                               worker_tx_pkt(ev.mbuf);
-                               tx++;
-                               continue;
-                       }
-
-                       worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-                       ev.queue_id = (cq_id == lst_qid) ?
-                               cdata.next_qid[ev.queue_id] : ev.queue_id;
-               } else {
-                       ev.queue_id = cdata.next_qid[ev.queue_id];
-                       worker_fwd_event(&ev, cdata.queue_type);
-               }
-               work();
-
-               worker_event_enqueue(dev, port, &ev);
-               fwd++;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-
-       return 0;
-}
-
-static int
-worker_do_tx_atq(void *arg)
-{
-       struct rte_event ev;
-
-       struct worker_data *data = (struct worker_data *)arg;
-       const uint8_t dev = data->dev_id;
-       const uint8_t port = data->port_id;
-       const uint8_t lst_qid = cdata.num_stages - 1;
-       size_t fwd = 0, received = 0, tx = 0;
-
-       while (!fdata->done) {
-
-               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
-                       rte_pause();
-                       continue;
-               }
-
-               received++;
-               const uint8_t cq_id = ev.sub_event_type % cdata.num_stages;
-
-               if (cq_id == lst_qid) {
-                       if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                               worker_tx_pkt(ev.mbuf);
-                               tx++;
-                               continue;
-                       }
-
-                       worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               } else {
-                       ev.sub_event_type++;
-                       worker_fwd_event(&ev, cdata.queue_type);
-               }
-               work();
-
-               worker_event_enqueue(dev, port, &ev);
-               fwd++;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-
-       return 0;
-}
-
-static int
-worker_do_tx_burst(void *arg)
-{
-       struct rte_event ev[BATCH_SIZE];
-
-       struct worker_data *data = (struct worker_data *)arg;
-       uint8_t dev = data->dev_id;
-       uint8_t port = data->port_id;
-       uint8_t lst_qid = cdata.num_stages - 1;
-       size_t fwd = 0, received = 0, tx = 0;
-
-       while (!fdata->done) {
-               uint16_t i;
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-                               ev, BATCH_SIZE, 0);
-
-               if (nb_rx == 0) {
-                       rte_pause();
-                       continue;
-               }
-               received += nb_rx;
-
-               for (i = 0; i < nb_rx; i++) {
-                       const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
-
-                       if (cq_id >= lst_qid) {
-                               if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                                       worker_tx_pkt(ev[i].mbuf);
-                                       tx++;
-                                       ev[i].op = RTE_EVENT_OP_RELEASE;
-                                       continue;
-                               }
-                               ev[i].queue_id = (cq_id == lst_qid) ?
-                                       cdata.next_qid[ev[i].queue_id] :
-                                       ev[i].queue_id;
-
-                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
-                       } else {
-                               ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
-                               worker_fwd_event(&ev[i], cdata.queue_type);
-                       }
-                       work();
-               }
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-
-               fwd += nb_rx;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-
-       return 0;
-}
-
-static int
-worker_do_tx_burst_atq(void *arg)
-{
-       struct rte_event ev[BATCH_SIZE];
-
-       struct worker_data *data = (struct worker_data *)arg;
-       uint8_t dev = data->dev_id;
-       uint8_t port = data->port_id;
-       uint8_t lst_qid = cdata.num_stages - 1;
-       size_t fwd = 0, received = 0, tx = 0;
-
-       while (!fdata->done) {
-               uint16_t i;
-
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-                               ev, BATCH_SIZE, 0);
-
-               if (nb_rx == 0) {
-                       rte_pause();
-                       continue;
-               }
-               received += nb_rx;
-
-               for (i = 0; i < nb_rx; i++) {
-                       const uint8_t cq_id = ev[i].sub_event_type %
-                               cdata.num_stages;
-
-                       if (cq_id == lst_qid) {
-                               if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                                       worker_tx_pkt(ev[i].mbuf);
-                                       tx++;
-                                       ev[i].op = RTE_EVENT_OP_RELEASE;
-                                       continue;
-                               }
-
-                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
-                       } else {
-                               ev[i].sub_event_type++;
-                               worker_fwd_event(&ev[i], cdata.queue_type);
-                       }
-                       work();
-               }
-
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
-       }
-
-       if (!cdata.quiet)
-               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
-                               rte_lcore_id(), received, fwd, tx);
-
-       return 0;
-}
-
-static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
-               struct worker_data *worker_data)
-{
-       RTE_SET_USED(cons_data);
-       uint8_t i;
-       const uint8_t atq = cdata.all_type_queues ? 1 : 0;
-       const uint8_t dev_id = 0;
-       const uint8_t nb_ports = cdata.num_workers;
-       uint8_t nb_slots = 0;
-       uint8_t nb_queues = rte_eth_dev_count();
-
-       /*
-        * In case where all type queues are not enabled, use queues equal to
-        * number of stages * eth_dev_count and one extra queue per pipeline
-        * for Tx.
-        */
-       if (!atq) {
-               nb_queues *= cdata.num_stages;
-               nb_queues += rte_eth_dev_count();
-       }
-
-       struct rte_event_dev_config config = {
-                       .nb_event_queues = nb_queues,
-                       .nb_event_ports = nb_ports,
-                       .nb_events_limit  = 4096,
-                       .nb_event_queue_flows = 1024,
-                       .nb_event_port_dequeue_depth = 128,
-                       .nb_event_port_enqueue_depth = 128,
-       };
-       struct rte_event_port_conf wkr_p_conf = {
-                       .dequeue_depth = cdata.worker_cq_depth,
-                       .enqueue_depth = 64,
-                       .new_event_threshold = 4096,
-       };
-       struct rte_event_queue_conf wkr_q_conf = {
-                       .schedule_type = cdata.queue_type,
-                       .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
-                       .nb_atomic_flows = 1024,
-                       .nb_atomic_order_sequences = 1024,
-       };
-
-       int ret, ndev = rte_event_dev_count();
-
-       if (ndev < 1) {
-               printf("%d: No Eventdev Devices Found\n", __LINE__);
-               return -1;
-       }
-
-
-       struct rte_event_dev_info dev_info;
-       ret = rte_event_dev_info_get(dev_id, &dev_info);
-       printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
-
-       if (dev_info.max_event_port_dequeue_depth <
-                       config.nb_event_port_dequeue_depth)
-               config.nb_event_port_dequeue_depth =
-                               dev_info.max_event_port_dequeue_depth;
-       if (dev_info.max_event_port_enqueue_depth <
-                       config.nb_event_port_enqueue_depth)
-               config.nb_event_port_enqueue_depth =
-                               dev_info.max_event_port_enqueue_depth;
-
-       ret = rte_event_dev_configure(dev_id, &config);
-       if (ret < 0) {
-               printf("%d: Error configuring device\n", __LINE__);
-               return -1;
-       }
-
-       printf("  Stages:\n");
-       for (i = 0; i < nb_queues; i++) {
-
-               if (atq) {
-
-                       nb_slots = cdata.num_stages;
-                       wkr_q_conf.event_queue_cfg =
-                               RTE_EVENT_QUEUE_CFG_ALL_TYPES;
-               } else {
-                       uint8_t slot;
-
-                       nb_slots = cdata.num_stages + 1;
-                       slot = i % nb_slots;
-                       wkr_q_conf.schedule_type = slot == cdata.num_stages ?
-                               RTE_SCHED_TYPE_ATOMIC : cdata.queue_type;
-               }
-
-               if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
-                       printf("%d: error creating qid %d\n", __LINE__, i);
-                       return -1;
-               }
-               cdata.qid[i] = i;
-               cdata.next_qid[i] = i+1;
-               if (cdata.enable_queue_priorities) {
-                       const uint32_t prio_delta =
-                               (RTE_EVENT_DEV_PRIORITY_LOWEST) /
-                               nb_slots;
-
-                       /* higher priority for queues closer to tx */
-                       wkr_q_conf.priority =
-                               RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta *
-                               (i % nb_slots);
-               }
-
-               const char *type_str = "Atomic";
-               switch (wkr_q_conf.schedule_type) {
-               case RTE_SCHED_TYPE_ORDERED:
-                       type_str = "Ordered";
-                       break;
-               case RTE_SCHED_TYPE_PARALLEL:
-                       type_str = "Parallel";
-                       break;
-               }
-               printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
-                               wkr_q_conf.priority);
-       }
-
-       printf("\n");
-       if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
-               wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
-       if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
-               wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
-
-       /* set up one port per worker, linking to all stage queues */
-       for (i = 0; i < cdata.num_workers; i++) {
-               struct worker_data *w = &worker_data[i];
-               w->dev_id = dev_id;
-               if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
-                       printf("Error setting up port %d\n", i);
-                       return -1;
-               }
-
-               if (rte_event_port_link(dev_id, i, NULL, NULL, 0)
-                               != nb_queues) {
-                       printf("%d: error creating link for port %d\n",
-                                       __LINE__, i);
-                       return -1;
-               }
-               w->port_id = i;
-       }
-       /*
-        * Reduce the load on ingress event queue by splitting the traffic
-        * across multiple event queues.
-        * for example, nb_stages =  2 and nb_ethdev = 2 then
-        *
-        *      nb_queues = (2 * 2) + 2 = 6 (non atq)
-        *      rx_stride = 3
-        *
-        * So, traffic is split across queue 0 and queue 3 since queue id for
-        * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above
-        * case eth port 0, 1 will inject packets into event queue 0, 3
-        * respectively.
-        *
-        * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx.
-        */
-       cdata.rx_stride = atq ? 1 : nb_slots;
-       ret = rte_event_dev_service_id_get(dev_id,
-                               &fdata->evdev_service_id);
-       if (ret != -ESRCH && ret != 0) {
-               printf("Error getting the service ID\n");
-               return -1;
-       }
-       rte_service_runstate_set(fdata->evdev_service_id, 1);
-       rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-       if (rte_event_dev_start(dev_id) < 0) {
-               printf("Error starting eventdev\n");
-               return -1;
-       }
-
-       return dev_id;
-}
-
-
-struct rx_adptr_services {
-       uint16_t nb_rx_adptrs;
-       uint32_t *rx_adpt_arr;
-};
-
-static int32_t
-service_rx_adapter(void *arg)
-{
-       int i;
-       struct rx_adptr_services *adptr_services = arg;
-
-       for (i = 0; i < adptr_services->nb_rx_adptrs; i++)
-               rte_service_run_iter_on_app_lcore(
-                               adptr_services->rx_adpt_arr[i], 1);
-       return 0;
-}
-
-static void
-init_rx_adapter(uint16_t nb_ports)
-{
-       int i;
-       int ret;
-       uint8_t evdev_id = 0;
-       struct rx_adptr_services *adptr_services = NULL;
-       struct rte_event_dev_info dev_info;
-
-       ret = rte_event_dev_info_get(evdev_id, &dev_info);
-       adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
-
-       struct rte_event_port_conf rx_p_conf = {
-               .dequeue_depth = 8,
-               .enqueue_depth = 8,
-               .new_event_threshold = 1200,
-       };
-
-       if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-               rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-       if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-               rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
-
-       struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
-               .ev.sched_type = cdata.queue_type,
-       };
-
-       for (i = 0; i < nb_ports; i++) {
-               uint32_t cap;
-               uint32_t service_id;
-
-               ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                                       "failed to create rx adapter[%d]",
-                                       cdata.rx_adapter_id);
-
-               ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                                       "failed to get event rx adapter "
-                                       "capabilities");
-
-               queue_conf.ev.queue_id = cdata.rx_stride ?
-                       (i * cdata.rx_stride)
-                       : (uint8_t)cdata.qid[0];
-
-               ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                                       "Failed to add queues to Rx adapter");
-
-
-               /* Producer needs to be scheduled. */
-               if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
-                       ret = rte_event_eth_rx_adapter_service_id_get(i,
-                                       &service_id);
-                       if (ret != -ESRCH && ret != 0) {
-                               rte_exit(EXIT_FAILURE,
-                               "Error getting the service ID for rx adptr\n");
-                       }
-
-                       rte_service_runstate_set(service_id, 1);
-                       rte_service_set_runstate_mapped_check(service_id, 0);
-
-                       adptr_services->nb_rx_adptrs++;
-                       adptr_services->rx_adpt_arr = rte_realloc(
-                                       adptr_services->rx_adpt_arr,
-                                       adptr_services->nb_rx_adptrs *
-                                       sizeof(uint32_t), 0);
-                       adptr_services->rx_adpt_arr[
-                               adptr_services->nb_rx_adptrs - 1] =
-                               service_id;
-               }
-
-               ret = rte_event_eth_rx_adapter_start(i);
-               if (ret)
-                       rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-                                       cdata.rx_adapter_id);
-       }
-
-       if (adptr_services->nb_rx_adptrs) {
-               struct rte_service_spec service;
-
-               memset(&service, 0, sizeof(struct rte_service_spec));
-               snprintf(service.name, sizeof(service.name), "rx_service");
-               service.callback = service_rx_adapter;
-               service.callback_userdata = (void *)adptr_services;
-
-               int32_t ret = rte_service_component_register(&service,
-                               &fdata->rxadptr_service_id);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                               "Rx adapter[%d] service register failed",
-                               cdata.rx_adapter_id);
-
-               rte_service_runstate_set(fdata->rxadptr_service_id, 1);
-               rte_service_component_runstate_set(fdata->rxadptr_service_id,
-                               1);
-               rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id,
-                               0);
-       } else {
-               memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-               rte_free(adptr_services);
-       }
-
-       if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
-                       (dev_info.event_dev_cap &
-                        RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
-               fdata->cap.scheduler = NULL;
-
-       if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
-               memset(fdata->sched_core, 0,
-                               sizeof(unsigned int) * MAX_NUM_CORE);
-}
-
-static void
-worker_tx_opt_check(void)
-{
-       int i;
-       int ret;
-       uint32_t cap = 0;
-       uint8_t rx_needed = 0;
-       struct rte_event_dev_info eventdev_info;
-
-       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
-       rte_event_dev_info_get(0, &eventdev_info);
-
-       if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
-                               RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
-               rte_exit(EXIT_FAILURE,
-                               "Event dev doesn't support all type queues\n");
-
-       for (i = 0; i < rte_eth_dev_count(); i++) {
-               ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
-               if (ret)
-                       rte_exit(EXIT_FAILURE,
-                                       "failed to get event rx adapter "
-                                       "capabilities");
-               rx_needed |=
-                       !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
-       }
-
-       if (cdata.worker_lcore_mask == 0 ||
-                       (rx_needed && cdata.rx_lcore_mask == 0) ||
-                       (cdata.sched_lcore_mask == 0 &&
-                        !(eventdev_info.event_dev_cap &
-                                RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
-               printf("Core part of pipeline was not assigned any cores. "
-                       "This will stall the pipeline, please check core masks "
-                       "(use -h for details on setting core masks):\n"
-                       "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
-                       "\n\tworkers: %"PRIu64"\n",
-                       cdata.rx_lcore_mask, cdata.tx_lcore_mask,
-                       cdata.sched_lcore_mask,
-                       cdata.worker_lcore_mask);
-               rte_exit(-1, "Fix core masks\n");
-       }
-}
-
-static worker_loop
-get_worker_loop_single_burst(uint8_t atq)
-{
-       if (atq)
-               return worker_do_tx_single_burst_atq;
-
-       return worker_do_tx_single_burst;
-}
-
-static worker_loop
-get_worker_loop_single_non_burst(uint8_t atq)
-{
-       if (atq)
-               return worker_do_tx_single_atq;
-
-       return worker_do_tx_single;
-}
-
-static worker_loop
-get_worker_loop_burst(uint8_t atq)
-{
-       if (atq)
-               return worker_do_tx_burst_atq;
-
-       return worker_do_tx_burst;
-}
-
-static worker_loop
-get_worker_loop_non_burst(uint8_t atq)
-{
-       if (atq)
-               return worker_do_tx_atq;
-
-       return worker_do_tx;
-}
-
-static worker_loop
-get_worker_single_stage(bool burst)
-{
-       uint8_t atq = cdata.all_type_queues ? 1 : 0;
-
-       if (burst)
-               return get_worker_loop_single_burst(atq);
-
-       return get_worker_loop_single_non_burst(atq);
-}
-
-static worker_loop
-get_worker_multi_stage(bool burst)
-{
-       uint8_t atq = cdata.all_type_queues ? 1 : 0;
-
-       if (burst)
-               return get_worker_loop_burst(atq);
-
-       return get_worker_loop_non_burst(atq);
-}
-
-void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
-{
-       if (cdata.num_stages == 1)
-               caps->worker = get_worker_single_stage(burst);
-       else
-               caps->worker = get_worker_multi_stage(burst);
-
-       memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
-       caps->check_opt = worker_tx_opt_check;
-       caps->consumer = NULL;
-       caps->scheduler = schedule_devices;
-       caps->evdev_setup = setup_eventdev_worker_tx;
-       caps->adptr_setup = init_rx_adapter;
-}