From: David Hunt Date: Mon, 20 Mar 2017 10:08:30 +0000 (+0000) Subject: distributor: switch over to new API X-Git-Tag: spdx-start~4386 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=c0de0eb82e40;p=dpdk.git distributor: switch over to new API This is the main switch over between the legacy API and the new burst API. We rename all the functions in rte_distributor.c to remove the _v1705, and we add in _v20 in the rte_distributor_v20.c We also rename the rte_distributor_next.h as rte_distributor.h, as this is now the public header. At the same time, we need the autotests and sample app to compile properly, hence those changes are in this patch also. Signed-off-by: David Hunt Acked-by: Bruce Richardson --- diff --git a/examples/distributor/main.c b/examples/distributor/main.c index 7b8a7595ef..a748985449 100644 --- a/examples/distributor/main.c +++ b/examples/distributor/main.c @@ -405,17 +405,30 @@ lcore_worker(struct lcore_params *p) { struct rte_distributor *d = p->d; const unsigned id = p->worker_id; + unsigned int num = 0; + unsigned int i; + /* * for single port, xor_val will be zero so we won't modify the output * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa */ const unsigned xor_val = (rte_eth_dev_count() > 1); - struct rte_mbuf *buf = NULL; + struct rte_mbuf *buf[8] __rte_cache_aligned; + + for (i = 0; i < 8; i++) + buf[i] = NULL; printf("\nCore %u acting as worker core.\n", rte_lcore_id()); while (!quit_signal) { - buf = rte_distributor_get_pkt(d, id, buf); - buf->port ^= xor_val; + num = rte_distributor_get_pkt(d, id, buf, buf, num); + /* Do a little bit of work for each packet */ + for (i = 0; i < num; i++) { + uint64_t t = rte_rdtsc()+100; + + while (rte_rdtsc() < t) + rte_pause(); + buf[i]->port ^= xor_val; + } } return 0; } @@ -561,7 +574,8 @@ main(int argc, char *argv[]) } d = rte_distributor_create("PKT_DIST", rte_socket_id(), - rte_lcore_count() - 2); + rte_lcore_count() - 2, + RTE_DIST_ALG_BURST); if (d == NULL) rte_exit(EXIT_FAILURE, "Cannot create distributor\n"); diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile index fb79c683a0..3ffb911ce8 100644 --- a/lib/librte_distributor/Makefile +++ b/lib/librte_distributor/Makefile @@ -57,6 +57,5 @@ endif # install this header file SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h -SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include += rte_distributor_v20.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 6158fa688e..6e1debf576 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -42,10 +42,10 @@ #include #include #include "rte_distributor_private.h" -#include "rte_distributor_next.h" +#include "rte_distributor.h" #include "rte_distributor_v20.h" -TAILQ_HEAD(rte_dist_burst_list, rte_distributor_v1705); +TAILQ_HEAD(rte_dist_burst_list, rte_distributor); static struct rte_tailq_elem rte_dist_burst_tailq = { .name = "RTE_DIST_BURST", @@ -57,17 +57,17 @@ EAL_REGISTER_TAILQ(rte_dist_burst_tailq) /**** Burst Packet APIs called by workers ****/ void -rte_distributor_request_pkt_v1705(struct rte_distributor_v1705 *d, +rte_distributor_request_pkt(struct rte_distributor *d, unsigned int worker_id, struct rte_mbuf **oldpkt, unsigned int count) { - struct rte_distributor_buffer_v1705 *buf = &(d->bufs[worker_id]); + struct rte_distributor_buffer *buf = &(d->bufs[worker_id]); unsigned int i; volatile int64_t *retptr64; if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { - rte_distributor_request_pkt(d->d_v20, + rte_distributor_request_pkt_v20(d->d_v20, worker_id, oldpkt[0]); return; } @@ -104,16 +104,16 @@ rte_distributor_request_pkt_v1705(struct rte_distributor_v1705 *d, } int -rte_distributor_poll_pkt_v1705(struct rte_distributor_v1705 *d, +rte_distributor_poll_pkt(struct rte_distributor *d, unsigned int worker_id, struct rte_mbuf **pkts) { - struct rte_distributor_buffer_v1705 *buf = &d->bufs[worker_id]; + struct rte_distributor_buffer *buf = &d->bufs[worker_id]; uint64_t ret; int count = 0; unsigned int i; if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { - pkts[0] = rte_distributor_poll_pkt(d->d_v20, worker_id); + pkts[0] = rte_distributor_poll_pkt_v20(d->d_v20, worker_id); return (pkts[0]) ? 1 : 0; } @@ -140,7 +140,7 @@ rte_distributor_poll_pkt_v1705(struct rte_distributor_v1705 *d, } int -rte_distributor_get_pkt_v1705(struct rte_distributor_v1705 *d, +rte_distributor_get_pkt(struct rte_distributor *d, unsigned int worker_id, struct rte_mbuf **pkts, struct rte_mbuf **oldpkt, unsigned int return_count) { @@ -148,37 +148,37 @@ rte_distributor_get_pkt_v1705(struct rte_distributor_v1705 *d, if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { if (return_count <= 1) { - pkts[0] = rte_distributor_get_pkt(d->d_v20, + pkts[0] = rte_distributor_get_pkt_v20(d->d_v20, worker_id, oldpkt[0]); return (pkts[0]) ? 1 : 0; } else return -EINVAL; } - rte_distributor_request_pkt_v1705(d, worker_id, oldpkt, return_count); + rte_distributor_request_pkt(d, worker_id, oldpkt, return_count); - count = rte_distributor_poll_pkt_v1705(d, worker_id, pkts); + count = rte_distributor_poll_pkt(d, worker_id, pkts); while (count == -1) { uint64_t t = rte_rdtsc() + 100; while (rte_rdtsc() < t) rte_pause(); - count = rte_distributor_poll_pkt_v1705(d, worker_id, pkts); + count = rte_distributor_poll_pkt(d, worker_id, pkts); } return count; } int -rte_distributor_return_pkt_v1705(struct rte_distributor_v1705 *d, +rte_distributor_return_pkt(struct rte_distributor *d, unsigned int worker_id, struct rte_mbuf **oldpkt, int num) { - struct rte_distributor_buffer_v1705 *buf = &d->bufs[worker_id]; + struct rte_distributor_buffer *buf = &d->bufs[worker_id]; unsigned int i; if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { if (num == 1) - return rte_distributor_return_pkt(d->d_v20, + return rte_distributor_return_pkt_v20(d->d_v20, worker_id, oldpkt[0]); else return -EINVAL; @@ -202,7 +202,7 @@ rte_distributor_return_pkt_v1705(struct rte_distributor_v1705 *d, /* stores a packet returned from a worker inside the returns array */ static inline void -store_return(uintptr_t oldbuf, struct rte_distributor_v1705 *d, +store_return(uintptr_t oldbuf, struct rte_distributor *d, unsigned int *ret_start, unsigned int *ret_count) { if (!oldbuf) @@ -221,7 +221,7 @@ store_return(uintptr_t oldbuf, struct rte_distributor_v1705 *d, * workers to give us our atomic flow pinning. */ void -find_match_scalar(struct rte_distributor_v1705 *d, +find_match_scalar(struct rte_distributor *d, uint16_t *data_ptr, uint16_t *output_ptr) { @@ -270,9 +270,9 @@ find_match_scalar(struct rte_distributor_v1705 *d, * the valid returned pointers (store_return). */ static unsigned int -handle_returns(struct rte_distributor_v1705 *d, unsigned int wkr) +handle_returns(struct rte_distributor *d, unsigned int wkr) { - struct rte_distributor_buffer_v1705 *buf = &(d->bufs[wkr]); + struct rte_distributor_buffer *buf = &(d->bufs[wkr]); uintptr_t oldbuf; unsigned int ret_start = d->returns.start, ret_count = d->returns.count; @@ -308,9 +308,9 @@ handle_returns(struct rte_distributor_v1705 *d, unsigned int wkr) * before sending out new packets. */ static unsigned int -release(struct rte_distributor_v1705 *d, unsigned int wkr) +release(struct rte_distributor *d, unsigned int wkr) { - struct rte_distributor_buffer_v1705 *buf = &(d->bufs[wkr]); + struct rte_distributor_buffer *buf = &(d->bufs[wkr]); unsigned int i; while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)) @@ -342,7 +342,7 @@ release(struct rte_distributor_v1705 *d, unsigned int wkr) /* process a set of packets to distribute them to workers */ int -rte_distributor_process_v1705(struct rte_distributor_v1705 *d, +rte_distributor_process(struct rte_distributor *d, struct rte_mbuf **mbufs, unsigned int num_mbufs) { unsigned int next_idx = 0; @@ -355,7 +355,7 @@ rte_distributor_process_v1705(struct rte_distributor_v1705 *d, if (d->alg_type == RTE_DIST_ALG_SINGLE) { /* Call the old API */ - return rte_distributor_process(d->d_v20, mbufs, num_mbufs); + return rte_distributor_process_v20(d->d_v20, mbufs, num_mbufs); } if (unlikely(num_mbufs == 0)) { @@ -479,7 +479,7 @@ rte_distributor_process_v1705(struct rte_distributor_v1705 *d, /* return to the caller, packets returned from workers */ int -rte_distributor_returned_pkts_v1705(struct rte_distributor_v1705 *d, +rte_distributor_returned_pkts(struct rte_distributor *d, struct rte_mbuf **mbufs, unsigned int max_mbufs) { struct rte_distributor_returned_pkts *returns = &d->returns; @@ -489,7 +489,7 @@ rte_distributor_returned_pkts_v1705(struct rte_distributor_v1705 *d, if (d->alg_type == RTE_DIST_ALG_SINGLE) { /* Call the old API */ - return rte_distributor_returned_pkts(d->d_v20, + return rte_distributor_returned_pkts_v20(d->d_v20, mbufs, max_mbufs); } @@ -510,7 +510,7 @@ rte_distributor_returned_pkts_v1705(struct rte_distributor_v1705 *d, * being workered on or queued up in a backlog. */ static inline unsigned int -total_outstanding(const struct rte_distributor_v1705 *d) +total_outstanding(const struct rte_distributor *d) { unsigned int wkr, total_outstanding = 0; @@ -525,24 +525,24 @@ total_outstanding(const struct rte_distributor_v1705 *d) * queued up. */ int -rte_distributor_flush_v1705(struct rte_distributor_v1705 *d) +rte_distributor_flush(struct rte_distributor *d) { const unsigned int flushed = total_outstanding(d); unsigned int wkr; if (d->alg_type == RTE_DIST_ALG_SINGLE) { /* Call the old API */ - return rte_distributor_flush(d->d_v20); + return rte_distributor_flush_v20(d->d_v20); } while (total_outstanding(d) > 0) - rte_distributor_process_v1705(d, NULL, 0); + rte_distributor_process(d, NULL, 0); /* * Send empty burst to all workers to allow them to exit * gracefully, should they need to. */ - rte_distributor_process_v1705(d, NULL, 0); + rte_distributor_process(d, NULL, 0); for (wkr = 0; wkr < d->num_workers; wkr++) handle_returns(d, wkr); @@ -552,13 +552,13 @@ rte_distributor_flush_v1705(struct rte_distributor_v1705 *d) /* clears the internal returns array in the distributor */ void -rte_distributor_clear_returns_v1705(struct rte_distributor_v1705 *d) +rte_distributor_clear_returns(struct rte_distributor *d) { unsigned int wkr; if (d->alg_type == RTE_DIST_ALG_SINGLE) { /* Call the old API */ - rte_distributor_clear_returns(d->d_v20); + rte_distributor_clear_returns_v20(d->d_v20); } /* throw away returns, so workers can exit */ @@ -567,13 +567,13 @@ rte_distributor_clear_returns_v1705(struct rte_distributor_v1705 *d) } /* creates a distributor instance */ -struct rte_distributor_v1705 * -rte_distributor_create_v1705(const char *name, +struct rte_distributor * +rte_distributor_create(const char *name, unsigned int socket_id, unsigned int num_workers, unsigned int alg_type) { - struct rte_distributor_v1705 *d; + struct rte_distributor *d; struct rte_dist_burst_list *dist_burst_list; char mz_name[RTE_MEMZONE_NAMESIZE]; const struct rte_memzone *mz; @@ -586,8 +586,8 @@ rte_distributor_create_v1705(const char *name, RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0); if (alg_type == RTE_DIST_ALG_SINGLE) { - d = malloc(sizeof(struct rte_distributor_v1705)); - d->d_v20 = rte_distributor_create(name, + d = malloc(sizeof(struct rte_distributor)); + d->d_v20 = rte_distributor_create_v20(name, socket_id, num_workers); if (d->d_v20 == NULL) { /* rte_errno will have been set */ diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h index e41d522e4c..9b9efdbe02 100644 --- a/lib/librte_distributor/rte_distributor.h +++ b/lib/librte_distributor/rte_distributor.h @@ -1,8 +1,7 @@ /*- * BSD LICENSE * - * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. - * All rights reserved. + * Copyright(c) 2017 Intel Corporation. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -31,9 +30,240 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef _RTE_DISTRIBUTE_H_ -#define _RTE_DISTRIBUTE_H_ +#ifndef _RTE_DISTRIBUTOR_H_ +#define _RTE_DISTRIBUTOR_H_ -#include +/** + * @file + * RTE distributor + * + * The distributor is a component which is designed to pass packets + * one-at-a-time to workers, with dynamic load balancing. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +/* Type of distribution (burst/single) */ +enum rte_distributor_alg_type { + RTE_DIST_ALG_BURST = 0, + RTE_DIST_ALG_SINGLE, + RTE_DIST_NUM_ALG_TYPES +}; + +struct rte_distributor; +struct rte_mbuf; + +/** + * Function to create a new distributor instance + * + * Reserves the memory needed for the distributor operation and + * initializes the distributor to work with the configured number of workers. + * + * @param name + * The name to be given to the distributor instance. + * @param socket_id + * The NUMA node on which the memory is to be allocated + * @param num_workers + * The maximum number of workers that will request packets from this + * distributor + * @param alg_type + * Call the legacy API, or use the new burst API. legacy uses 32-bit + * flow ID, and works on a single packet at a time. Latest uses 15- + * bit flow ID and works on up to 8 packets at a time to worers. + * @return + * The newly created distributor instance + */ +struct rte_distributor * +rte_distributor_create(const char *name, unsigned int socket_id, + unsigned int num_workers, + unsigned int alg_type); + +/* *** APIS to be called on the distributor lcore *** */ +/* + * The following APIs are the public APIs which are designed for use on a + * single lcore which acts as the distributor lcore for a given distributor + * instance. These functions cannot be called on multiple cores simultaneously + * without using locking to protect access to the internals of the distributor. + * + * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore + * for the same distributor instance, otherwise deadlock will result. + */ + +/** + * Process a set of packets by distributing them among workers that request + * packets. The distributor will ensure that no two packets that have the + * same flow id, or tag, in the mbuf will be processed on different cores at + * the same time. + * + * The user is advocated to set tag for each mbuf before calling this function. + * If user doesn't set the tag, the tag value can be various values depending on + * driver implementation and configuration. + * + * This is not multi-thread safe and should only be called on a single lcore. + * + * @param d + * The distributor instance to be used + * @param mbufs + * The mbufs to be distributed + * @param num_mbufs + * The number of mbufs in the mbufs array + * @return + * The number of mbufs processed. + */ +int +rte_distributor_process(struct rte_distributor *d, + struct rte_mbuf **mbufs, unsigned int num_mbufs); + +/** + * Get a set of mbufs that have been returned to the distributor by workers + * + * This should only be called on the same lcore as rte_distributor_process() + * + * @param d + * The distributor instance to be used + * @param mbufs + * The mbufs pointer array to be filled in + * @param max_mbufs + * The size of the mbufs array + * @return + * The number of mbufs returned in the mbufs array. + */ +int +rte_distributor_returned_pkts(struct rte_distributor *d, + struct rte_mbuf **mbufs, unsigned int max_mbufs); + +/** + * Flush the distributor component, so that there are no in-flight or + * backlogged packets awaiting processing + * + * This should only be called on the same lcore as rte_distributor_process() + * + * @param d + * The distributor instance to be used + * @return + * The number of queued/in-flight packets that were completed by this call. + */ +int +rte_distributor_flush(struct rte_distributor *d); + +/** + * Clears the array of returned packets used as the source for the + * rte_distributor_returned_pkts() API call. + * + * This should only be called on the same lcore as rte_distributor_process() + * + * @param d + * The distributor instance to be used + */ +void +rte_distributor_clear_returns(struct rte_distributor *d); + +/* *** APIS to be called on the worker lcores *** */ +/* + * The following APIs are the public APIs which are designed for use on + * multiple lcores which act as workers for a distributor. Each lcore should use + * a unique worker id when requesting packets. + * + * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore + * for the same distributor instance, otherwise deadlock will result. + */ + +/** + * API called by a worker to get new packets to process. Any previous packets + * given to the worker is assumed to have completed processing, and may be + * optionally returned to the distributor via the oldpkt parameter. + * + * @param d + * The distributor instance to be used + * @param worker_id + * The worker instance number to use - must be less that num_workers passed + * at distributor creation time. + * @param pkts + * The mbufs pointer array to be filled in (up to 8 packets) + * @param oldpkt + * The previous packet, if any, being processed by the worker + * @param retcount + * The number of packets being returned + * + * @return + * The number of packets in the pkts array + */ +int +rte_distributor_get_pkt(struct rte_distributor *d, + unsigned int worker_id, struct rte_mbuf **pkts, + struct rte_mbuf **oldpkt, unsigned int retcount); + +/** + * API called by a worker to return a completed packet without requesting a + * new packet, for example, because a worker thread is shutting down + * + * @param d + * The distributor instance to be used + * @param worker_id + * The worker instance number to use - must be less that num_workers passed + * at distributor creation time. + * @param oldpkt + * The previous packets being processed by the worker + * @param num + * The number of packets in the oldpkt array + */ +int +rte_distributor_return_pkt(struct rte_distributor *d, + unsigned int worker_id, struct rte_mbuf **oldpkt, int num); + +/** + * API called by a worker to request a new packet to process. + * Any previous packet given to the worker is assumed to have completed + * processing, and may be optionally returned to the distributor via + * the oldpkt parameter. + * Unlike rte_distributor_get_pkt_burst(), this function does not wait for a + * new packet to be provided by the distributor. + * + * NOTE: after calling this function, rte_distributor_poll_pkt_burst() should + * be used to poll for the packet requested. The rte_distributor_get_pkt_burst() + * API should *not* be used to try and retrieve the new packet. + * + * @param d + * The distributor instance to be used + * @param worker_id + * The worker instance number to use - must be less that num_workers passed + * at distributor creation time. + * @param oldpkt + * The returning packets, if any, processed by the worker + * @param count + * The number of returning packets + */ +void +rte_distributor_request_pkt(struct rte_distributor *d, + unsigned int worker_id, struct rte_mbuf **oldpkt, + unsigned int count); + +/** + * API called by a worker to check for a new packet that was previously + * requested by a call to rte_distributor_request_pkt(). It does not wait + * for the new packet to be available, but returns NULL if the request has + * not yet been fulfilled by the distributor. + * + * @param d + * The distributor instance to be used + * @param worker_id + * The worker instance number to use - must be less that num_workers passed + * at distributor creation time. + * @param mbufs + * The array of mbufs being given to the worker + * + * @return + * The number of packets being given to the worker thread, zero if no + * packet is yet available. + */ +int +rte_distributor_poll_pkt(struct rte_distributor *d, + unsigned int worker_id, struct rte_mbuf **mbufs); + +#ifdef __cplusplus +} +#endif #endif diff --git a/lib/librte_distributor/rte_distributor_match_generic.c b/lib/librte_distributor/rte_distributor_match_generic.c index 7c2f9f5411..4925a7887c 100644 --- a/lib/librte_distributor/rte_distributor_match_generic.c +++ b/lib/librte_distributor/rte_distributor_match_generic.c @@ -35,7 +35,7 @@ #include "rte_distributor.h" void -find_match_vec(struct rte_distributor_v1705 *d, +find_match_vec(struct rte_distributor *d, uint16_t *data_ptr, uint16_t *output_ptr) { diff --git a/lib/librte_distributor/rte_distributor_match_sse.c b/lib/librte_distributor/rte_distributor_match_sse.c index b9f9bb0a23..44935a69f4 100644 --- a/lib/librte_distributor/rte_distributor_match_sse.c +++ b/lib/librte_distributor/rte_distributor_match_sse.c @@ -38,7 +38,7 @@ void -find_match_vec(struct rte_distributor_v1705 *d, +find_match_vec(struct rte_distributor *d, uint16_t *data_ptr, uint16_t *output_ptr) { diff --git a/lib/librte_distributor/rte_distributor_next.h b/lib/librte_distributor/rte_distributor_next.h deleted file mode 100644 index 003402096b..0000000000 --- a/lib/librte_distributor/rte_distributor_next.h +++ /dev/null @@ -1,269 +0,0 @@ -/*- - * BSD LICENSE - * - * Copyright(c) 2017 Intel Corporation. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _RTE_DISTRIBUTOR_H_ -#define _RTE_DISTRIBUTOR_H_ - -/** - * @file - * RTE distributor - * - * The distributor is a component which is designed to pass packets - * one-at-a-time to workers, with dynamic load balancing. - */ - -#ifdef __cplusplus -extern "C" { -#endif - -/* Type of distribution (burst/single) */ -enum rte_distributor_alg_type { - RTE_DIST_ALG_BURST = 0, - RTE_DIST_ALG_SINGLE, - RTE_DIST_NUM_ALG_TYPES -}; - -struct rte_distributor_v1705; -struct rte_mbuf; - -/** - * Function to create a new distributor instance - * - * Reserves the memory needed for the distributor operation and - * initializes the distributor to work with the configured number of workers. - * - * @param name - * The name to be given to the distributor instance. - * @param socket_id - * The NUMA node on which the memory is to be allocated - * @param num_workers - * The maximum number of workers that will request packets from this - * distributor - * @param alg_type - * Call the legacy API, or use the new burst API. legacy uses 32-bit - * flow ID, and works on a single packet at a time. Latest uses 15- - * bit flow ID and works on up to 8 packets at a time to worers. - * @return - * The newly created distributor instance - */ -struct rte_distributor_v1705 * -rte_distributor_create_v1705(const char *name, unsigned int socket_id, - unsigned int num_workers, - unsigned int alg_type); - -/* *** APIS to be called on the distributor lcore *** */ -/* - * The following APIs are the public APIs which are designed for use on a - * single lcore which acts as the distributor lcore for a given distributor - * instance. These functions cannot be called on multiple cores simultaneously - * without using locking to protect access to the internals of the distributor. - * - * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore - * for the same distributor instance, otherwise deadlock will result. - */ - -/** - * Process a set of packets by distributing them among workers that request - * packets. The distributor will ensure that no two packets that have the - * same flow id, or tag, in the mbuf will be processed on different cores at - * the same time. - * - * The user is advocated to set tag for each mbuf before calling this function. - * If user doesn't set the tag, the tag value can be various values depending on - * driver implementation and configuration. - * - * This is not multi-thread safe and should only be called on a single lcore. - * - * @param d - * The distributor instance to be used - * @param mbufs - * The mbufs to be distributed - * @param num_mbufs - * The number of mbufs in the mbufs array - * @return - * The number of mbufs processed. - */ -int -rte_distributor_process_v1705(struct rte_distributor_v1705 *d, - struct rte_mbuf **mbufs, unsigned int num_mbufs); - -/** - * Get a set of mbufs that have been returned to the distributor by workers - * - * This should only be called on the same lcore as rte_distributor_process() - * - * @param d - * The distributor instance to be used - * @param mbufs - * The mbufs pointer array to be filled in - * @param max_mbufs - * The size of the mbufs array - * @return - * The number of mbufs returned in the mbufs array. - */ -int -rte_distributor_returned_pkts_v1705(struct rte_distributor_v1705 *d, - struct rte_mbuf **mbufs, unsigned int max_mbufs); - -/** - * Flush the distributor component, so that there are no in-flight or - * backlogged packets awaiting processing - * - * This should only be called on the same lcore as rte_distributor_process() - * - * @param d - * The distributor instance to be used - * @return - * The number of queued/in-flight packets that were completed by this call. - */ -int -rte_distributor_flush_v1705(struct rte_distributor_v1705 *d); - -/** - * Clears the array of returned packets used as the source for the - * rte_distributor_returned_pkts() API call. - * - * This should only be called on the same lcore as rte_distributor_process() - * - * @param d - * The distributor instance to be used - */ -void -rte_distributor_clear_returns_v1705(struct rte_distributor_v1705 *d); - -/* *** APIS to be called on the worker lcores *** */ -/* - * The following APIs are the public APIs which are designed for use on - * multiple lcores which act as workers for a distributor. Each lcore should use - * a unique worker id when requesting packets. - * - * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore - * for the same distributor instance, otherwise deadlock will result. - */ - -/** - * API called by a worker to get new packets to process. Any previous packets - * given to the worker is assumed to have completed processing, and may be - * optionally returned to the distributor via the oldpkt parameter. - * - * @param d - * The distributor instance to be used - * @param worker_id - * The worker instance number to use - must be less that num_workers passed - * at distributor creation time. - * @param pkts - * The mbufs pointer array to be filled in (up to 8 packets) - * @param oldpkt - * The previous packet, if any, being processed by the worker - * @param retcount - * The number of packets being returned - * - * @return - * The number of packets in the pkts array - */ -int -rte_distributor_get_pkt_v1705(struct rte_distributor_v1705 *d, - unsigned int worker_id, struct rte_mbuf **pkts, - struct rte_mbuf **oldpkt, unsigned int retcount); - -/** - * API called by a worker to return a completed packet without requesting a - * new packet, for example, because a worker thread is shutting down - * - * @param d - * The distributor instance to be used - * @param worker_id - * The worker instance number to use - must be less that num_workers passed - * at distributor creation time. - * @param oldpkt - * The previous packets being processed by the worker - * @param num - * The number of packets in the oldpkt array - */ -int -rte_distributor_return_pkt_v1705(struct rte_distributor_v1705 *d, - unsigned int worker_id, struct rte_mbuf **oldpkt, int num); - -/** - * API called by a worker to request a new packet to process. - * Any previous packet given to the worker is assumed to have completed - * processing, and may be optionally returned to the distributor via - * the oldpkt parameter. - * Unlike rte_distributor_get_pkt_burst(), this function does not wait for a - * new packet to be provided by the distributor. - * - * NOTE: after calling this function, rte_distributor_poll_pkt_burst() should - * be used to poll for the packet requested. The rte_distributor_get_pkt_burst() - * API should *not* be used to try and retrieve the new packet. - * - * @param d - * The distributor instance to be used - * @param worker_id - * The worker instance number to use - must be less that num_workers passed - * at distributor creation time. - * @param oldpkt - * The returning packets, if any, processed by the worker - * @param count - * The number of returning packets - */ -void -rte_distributor_request_pkt_v1705(struct rte_distributor_v1705 *d, - unsigned int worker_id, struct rte_mbuf **oldpkt, - unsigned int count); - -/** - * API called by a worker to check for a new packet that was previously - * requested by a call to rte_distributor_request_pkt(). It does not wait - * for the new packet to be available, but returns NULL if the request has - * not yet been fulfilled by the distributor. - * - * @param d - * The distributor instance to be used - * @param worker_id - * The worker instance number to use - must be less that num_workers passed - * at distributor creation time. - * @param mbufs - * The array of mbufs being given to the worker - * - * @return - * The number of packets being given to the worker thread, zero if no - * packet is yet available. - */ -int -rte_distributor_poll_pkt_v1705(struct rte_distributor_v1705 *d, - unsigned int worker_id, struct rte_mbuf **mbufs); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/lib/librte_distributor/rte_distributor_private.h b/lib/librte_distributor/rte_distributor_private.h index 04c9cac12b..250b23e120 100644 --- a/lib/librte_distributor/rte_distributor_private.h +++ b/lib/librte_distributor/rte_distributor_private.h @@ -83,7 +83,7 @@ extern "C" { * the next cache line to worker 0, we pad this out to three cache lines. * Only 64-bits of the memory is actually used though. */ -union rte_distributor_buffer { +union rte_distributor_buffer_v20 { volatile int64_t bufptr64; char pad[RTE_CACHE_LINE_SIZE*3]; } __rte_cache_aligned; @@ -108,8 +108,8 @@ struct rte_distributor_returned_pkts { struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS]; }; -struct rte_distributor { - TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */ +struct rte_distributor_v20 { + TAILQ_ENTRY(rte_distributor_v20) next; /**< Next in list. */ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */ unsigned int num_workers; /**< Number of workers polling */ @@ -124,7 +124,7 @@ struct rte_distributor { struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]; - union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS]; + union rte_distributor_buffer_v20 bufs[RTE_DISTRIB_MAX_WORKERS]; struct rte_distributor_returned_pkts returns; }; @@ -144,7 +144,7 @@ enum rte_distributor_match_function { * We can pass up to 8 mbufs at a time in one cacheline. * There is a separate cacheline for returns in the burst API. */ -struct rte_distributor_buffer_v1705 { +struct rte_distributor_buffer { volatile int64_t bufptr64[RTE_DIST_BURST_SIZE] __rte_cache_aligned; /* <= outgoing to worker */ @@ -158,8 +158,8 @@ struct rte_distributor_buffer_v1705 { int count __rte_cache_aligned; /* <= number of current mbufs */ }; -struct rte_distributor_v1705 { - TAILQ_ENTRY(rte_distributor_v1705) next; /**< Next in list. */ +struct rte_distributor { + TAILQ_ENTRY(rte_distributor) next; /**< Next in list. */ char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */ unsigned int num_workers; /**< Number of workers polling */ @@ -176,22 +176,22 @@ struct rte_distributor_v1705 { struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS] __rte_cache_aligned; - struct rte_distributor_buffer_v1705 bufs[RTE_DISTRIB_MAX_WORKERS]; + struct rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS]; struct rte_distributor_returned_pkts returns; enum rte_distributor_match_function dist_match_fn; - struct rte_distributor *d_v20; + struct rte_distributor_v20 *d_v20; }; void -find_match_scalar(struct rte_distributor_v1705 *d, +find_match_scalar(struct rte_distributor *d, uint16_t *data_ptr, uint16_t *output_ptr); void -find_match_vec(struct rte_distributor_v1705 *d, +find_match_vec(struct rte_distributor *d, uint16_t *data_ptr, uint16_t *output_ptr); diff --git a/lib/librte_distributor/rte_distributor_v20.c b/lib/librte_distributor/rte_distributor_v20.c index be297ec8e1..1f406c56d5 100644 --- a/lib/librte_distributor/rte_distributor_v20.c +++ b/lib/librte_distributor/rte_distributor_v20.c @@ -43,7 +43,7 @@ #include "rte_distributor_v20.h" #include "rte_distributor_private.h" -TAILQ_HEAD(rte_distributor_list, rte_distributor); +TAILQ_HEAD(rte_distributor_list, rte_distributor_v20); static struct rte_tailq_elem rte_distributor_tailq = { .name = "RTE_DISTRIBUTOR", @@ -53,10 +53,10 @@ EAL_REGISTER_TAILQ(rte_distributor_tailq) /**** APIs called by workers ****/ void -rte_distributor_request_pkt(struct rte_distributor *d, +rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d, unsigned worker_id, struct rte_mbuf *oldpkt) { - union rte_distributor_buffer *buf = &d->bufs[worker_id]; + union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_GET_BUF; while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK)) @@ -65,10 +65,10 @@ rte_distributor_request_pkt(struct rte_distributor *d, } struct rte_mbuf * -rte_distributor_poll_pkt(struct rte_distributor *d, +rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d, unsigned worker_id) { - union rte_distributor_buffer *buf = &d->bufs[worker_id]; + union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; if (buf->bufptr64 & RTE_DISTRIB_GET_BUF) return NULL; @@ -78,21 +78,21 @@ rte_distributor_poll_pkt(struct rte_distributor *d, } struct rte_mbuf * -rte_distributor_get_pkt(struct rte_distributor *d, +rte_distributor_get_pkt_v20(struct rte_distributor_v20 *d, unsigned worker_id, struct rte_mbuf *oldpkt) { struct rte_mbuf *ret; - rte_distributor_request_pkt(d, worker_id, oldpkt); - while ((ret = rte_distributor_poll_pkt(d, worker_id)) == NULL) + rte_distributor_request_pkt_v20(d, worker_id, oldpkt); + while ((ret = rte_distributor_poll_pkt_v20(d, worker_id)) == NULL) rte_pause(); return ret; } int -rte_distributor_return_pkt(struct rte_distributor *d, +rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d, unsigned worker_id, struct rte_mbuf *oldpkt) { - union rte_distributor_buffer *buf = &d->bufs[worker_id]; + union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id]; uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; buf->bufptr64 = req; @@ -123,7 +123,7 @@ backlog_pop(struct rte_distributor_backlog *bl) /* stores a packet returned from a worker inside the returns array */ static inline void -store_return(uintptr_t oldbuf, struct rte_distributor *d, +store_return(uintptr_t oldbuf, struct rte_distributor_v20 *d, unsigned *ret_start, unsigned *ret_count) { /* store returns in a circular buffer - code is branch-free */ @@ -134,7 +134,7 @@ store_return(uintptr_t oldbuf, struct rte_distributor *d, } static inline void -handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) +handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr) { d->in_flight_tags[wkr] = 0; d->in_flight_bitmask &= ~(1UL << wkr); @@ -164,7 +164,7 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) * Note that the tags were set before first level call * to rte_distributor_process. */ - rte_distributor_process(d, pkts, i); + rte_distributor_process_v20(d, pkts, i); bl->count = bl->start = 0; } } @@ -174,7 +174,7 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) * to do a partial flush. */ static int -process_returns(struct rte_distributor *d) +process_returns(struct rte_distributor_v20 *d) { unsigned wkr; unsigned flushed = 0; @@ -213,7 +213,7 @@ process_returns(struct rte_distributor *d) /* process a set of packets to distribute them to workers */ int -rte_distributor_process(struct rte_distributor *d, +rte_distributor_process_v20(struct rte_distributor_v20 *d, struct rte_mbuf **mbufs, unsigned num_mbufs) { unsigned next_idx = 0; @@ -317,7 +317,7 @@ rte_distributor_process(struct rte_distributor *d, /* return to the caller, packets returned from workers */ int -rte_distributor_returned_pkts(struct rte_distributor *d, +rte_distributor_returned_pkts_v20(struct rte_distributor_v20 *d, struct rte_mbuf **mbufs, unsigned max_mbufs) { struct rte_distributor_returned_pkts *returns = &d->returns; @@ -338,7 +338,7 @@ rte_distributor_returned_pkts(struct rte_distributor *d, /* return the number of packets in-flight in a distributor, i.e. packets * being workered on or queued up in a backlog. */ static inline unsigned -total_outstanding(const struct rte_distributor *d) +total_outstanding(const struct rte_distributor_v20 *d) { unsigned wkr, total_outstanding; @@ -353,19 +353,19 @@ total_outstanding(const struct rte_distributor *d) /* flush the distributor, so that there are no outstanding packets in flight or * queued up. */ int -rte_distributor_flush(struct rte_distributor *d) +rte_distributor_flush_v20(struct rte_distributor_v20 *d) { const unsigned flushed = total_outstanding(d); while (total_outstanding(d) > 0) - rte_distributor_process(d, NULL, 0); + rte_distributor_process_v20(d, NULL, 0); return flushed; } /* clears the internal returns array in the distributor */ void -rte_distributor_clear_returns(struct rte_distributor *d) +rte_distributor_clear_returns_v20(struct rte_distributor_v20 *d) { d->returns.start = d->returns.count = 0; #ifndef __OPTIMIZE__ @@ -374,12 +374,12 @@ rte_distributor_clear_returns(struct rte_distributor *d) } /* creates a distributor instance */ -struct rte_distributor * -rte_distributor_create(const char *name, +struct rte_distributor_v20 * +rte_distributor_create_v20(const char *name, unsigned socket_id, unsigned num_workers) { - struct rte_distributor *d; + struct rte_distributor_v20 *d; struct rte_distributor_list *distributor_list; char mz_name[RTE_MEMZONE_NAMESIZE]; const struct rte_memzone *mz; diff --git a/lib/librte_distributor/rte_distributor_v20.h b/lib/librte_distributor/rte_distributor_v20.h index b69aa27329..f02e6aac82 100644 --- a/lib/librte_distributor/rte_distributor_v20.h +++ b/lib/librte_distributor/rte_distributor_v20.h @@ -48,7 +48,7 @@ extern "C" { #define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */ -struct rte_distributor; +struct rte_distributor_v20; struct rte_mbuf; /** @@ -67,8 +67,8 @@ struct rte_mbuf; * @return * The newly created distributor instance */ -struct rte_distributor * -rte_distributor_create(const char *name, unsigned int socket_id, +struct rte_distributor_v20 * +rte_distributor_create_v20(const char *name, unsigned int socket_id, unsigned int num_workers); /* *** APIS to be called on the distributor lcore *** */ @@ -103,7 +103,7 @@ rte_distributor_create(const char *name, unsigned int socket_id, * The number of mbufs processed. */ int -rte_distributor_process(struct rte_distributor *d, +rte_distributor_process_v20(struct rte_distributor_v20 *d, struct rte_mbuf **mbufs, unsigned int num_mbufs); /** @@ -121,7 +121,7 @@ rte_distributor_process(struct rte_distributor *d, * The number of mbufs returned in the mbufs array. */ int -rte_distributor_returned_pkts(struct rte_distributor *d, +rte_distributor_returned_pkts_v20(struct rte_distributor_v20 *d, struct rte_mbuf **mbufs, unsigned int max_mbufs); /** @@ -136,7 +136,7 @@ rte_distributor_returned_pkts(struct rte_distributor *d, * The number of queued/in-flight packets that were completed by this call. */ int -rte_distributor_flush(struct rte_distributor *d); +rte_distributor_flush_v20(struct rte_distributor_v20 *d); /** * Clears the array of returned packets used as the source for the @@ -148,7 +148,7 @@ rte_distributor_flush(struct rte_distributor *d); * The distributor instance to be used */ void -rte_distributor_clear_returns(struct rte_distributor *d); +rte_distributor_clear_returns_v20(struct rte_distributor_v20 *d); /* *** APIS to be called on the worker lcores *** */ /* @@ -177,7 +177,7 @@ rte_distributor_clear_returns(struct rte_distributor *d); * A new packet to be processed by the worker thread. */ struct rte_mbuf * -rte_distributor_get_pkt(struct rte_distributor *d, +rte_distributor_get_pkt_v20(struct rte_distributor_v20 *d, unsigned int worker_id, struct rte_mbuf *oldpkt); /** @@ -193,8 +193,8 @@ rte_distributor_get_pkt(struct rte_distributor *d, * The previous packet being processed by the worker */ int -rte_distributor_return_pkt(struct rte_distributor *d, unsigned int worker_id, - struct rte_mbuf *mbuf); +rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d, + unsigned int worker_id, struct rte_mbuf *mbuf); /** * API called by a worker to request a new packet to process. @@ -217,7 +217,7 @@ rte_distributor_return_pkt(struct rte_distributor *d, unsigned int worker_id, * The previous packet, if any, being processed by the worker */ void -rte_distributor_request_pkt(struct rte_distributor *d, +rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d, unsigned int worker_id, struct rte_mbuf *oldpkt); /** @@ -237,7 +237,7 @@ rte_distributor_request_pkt(struct rte_distributor *d, * packet is yet available. */ struct rte_mbuf * -rte_distributor_poll_pkt(struct rte_distributor *d, +rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d, unsigned int worker_id); #ifdef __cplusplus diff --git a/test/test/test_distributor.c b/test/test/test_distributor.c index 6059a0c84f..7a3051353f 100644 --- a/test/test/test_distributor.c +++ b/test/test/test_distributor.c @@ -1,7 +1,7 @@ /*- * BSD LICENSE * - * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * Copyright(c) 2010-2017 Intel Corporation. All rights reserved. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -87,20 +87,25 @@ clear_packet_count(void) static int handle_work(void *arg) { - struct rte_mbuf *pkt = NULL; + struct rte_mbuf *buf[8] __rte_cache_aligned; struct worker_params *wp = arg; - struct rte_distributor *d = wp->dist; - - unsigned count = 0; - unsigned id = __sync_fetch_and_add(&worker_idx, 1); - - pkt = rte_distributor_get_pkt(d, id, NULL); + struct rte_distributor *db = wp->dist; + unsigned int count = 0, num = 0; + unsigned int id = __sync_fetch_and_add(&worker_idx, 1); + int i; + + for (i = 0; i < 8; i++) + buf[i] = NULL; + num = rte_distributor_get_pkt(db, id, buf, buf, num); while (!quit) { - worker_stats[id].handled_packets++, count++; - pkt = rte_distributor_get_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + num = rte_distributor_get_pkt(db, id, + buf, buf, num); } - worker_stats[id].handled_packets++, count++; - rte_distributor_return_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + rte_distributor_return_pkt(db, id, buf, num); return 0; } @@ -118,9 +123,11 @@ handle_work(void *arg) static int sanity_test(struct worker_params *wp, struct rte_mempool *p) { - struct rte_distributor *d = wp->dist; + struct rte_distributor *db = wp->dist; struct rte_mbuf *bufs[BURST]; - unsigned i; + struct rte_mbuf *returns[BURST*2]; + unsigned int i, count; + unsigned int retries; printf("=== Basic distributor sanity tests ===\n"); clear_packet_count(); @@ -134,8 +141,15 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 0; - rte_distributor_process(d, bufs, BURST); - rte_distributor_flush(d); + rte_distributor_process(db, bufs, BURST); + count = 0; + do { + + rte_distributor_flush(db); + count += rte_distributor_returned_pkts(db, + returns, BURST*2); + } while (count < BURST); + if (total_packet_count() != BURST) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", @@ -147,8 +161,6 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Worker %u handled %u packets\n", i, worker_stats[i].handled_packets); printf("Sanity test with all zero hashes done.\n"); - if (worker_stats[0].handled_packets != BURST) - return -1; /* pick two flows and check they go correctly */ if (rte_lcore_count() >= 3) { @@ -156,8 +168,13 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) for (i = 0; i < BURST; i++) bufs[i]->hash.usr = (i & 1) << 8; - rte_distributor_process(d, bufs, BURST); - rte_distributor_flush(d); + rte_distributor_process(db, bufs, BURST); + count = 0; + do { + rte_distributor_flush(db); + count += rte_distributor_returned_pkts(db, + returns, BURST*2); + } while (count < BURST); if (total_packet_count() != BURST) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", @@ -169,20 +186,21 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Worker %u handled %u packets\n", i, worker_stats[i].handled_packets); printf("Sanity test with two hash values done\n"); - - if (worker_stats[0].handled_packets != 16 || - worker_stats[1].handled_packets != 16) - return -1; } /* give a different hash value to each packet, * so load gets distributed */ clear_packet_count(); for (i = 0; i < BURST; i++) - bufs[i]->hash.usr = i; - - rte_distributor_process(d, bufs, BURST); - rte_distributor_flush(d); + bufs[i]->hash.usr = i+1; + + rte_distributor_process(db, bufs, BURST); + count = 0; + do { + rte_distributor_flush(db); + count += rte_distributor_returned_pkts(db, + returns, BURST*2); + } while (count < BURST); if (total_packet_count() != BURST) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", @@ -204,8 +222,9 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) unsigned num_returned = 0; /* flush out any remaining packets */ - rte_distributor_flush(d); - rte_distributor_clear_returns(d); + rte_distributor_flush(db); + rte_distributor_clear_returns(db); + if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) { printf("line %d: Error getting mbufs from pool\n", __LINE__); return -1; @@ -213,28 +232,44 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) for (i = 0; i < BIG_BATCH; i++) many_bufs[i]->hash.usr = i << 2; + printf("=== testing big burst (%s) ===\n", wp->name); for (i = 0; i < BIG_BATCH/BURST; i++) { - rte_distributor_process(d, &many_bufs[i*BURST], BURST); - num_returned += rte_distributor_returned_pkts(d, + rte_distributor_process(db, + &many_bufs[i*BURST], BURST); + count = rte_distributor_returned_pkts(db, &return_bufs[num_returned], BIG_BATCH - num_returned); + num_returned += count; } - rte_distributor_flush(d); - num_returned += rte_distributor_returned_pkts(d, - &return_bufs[num_returned], BIG_BATCH - num_returned); + rte_distributor_flush(db); + count = rte_distributor_returned_pkts(db, + &return_bufs[num_returned], + BIG_BATCH - num_returned); + num_returned += count; + retries = 0; + do { + rte_distributor_flush(db); + count = rte_distributor_returned_pkts(db, + &return_bufs[num_returned], + BIG_BATCH - num_returned); + num_returned += count; + retries++; + } while ((num_returned < BIG_BATCH) && (retries < 100)); if (num_returned != BIG_BATCH) { - printf("line %d: Number returned is not the same as " - "number sent\n", __LINE__); + printf("line %d: Missing packets, expected %d\n", + __LINE__, num_returned); return -1; } + /* big check - make sure all packets made it back!! */ for (i = 0; i < BIG_BATCH; i++) { unsigned j; struct rte_mbuf *src = many_bufs[i]; - for (j = 0; j < BIG_BATCH; j++) + for (j = 0; j < BIG_BATCH; j++) { if (return_bufs[j] == src) break; + } if (j == BIG_BATCH) { printf("Error: could not find source packet #%u\n", i); @@ -258,20 +293,28 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) static int handle_work_with_free_mbufs(void *arg) { - struct rte_mbuf *pkt = NULL; + struct rte_mbuf *buf[8] __rte_cache_aligned; struct worker_params *wp = arg; struct rte_distributor *d = wp->dist; - unsigned count = 0; - unsigned id = __sync_fetch_and_add(&worker_idx, 1); - - pkt = rte_distributor_get_pkt(d, id, NULL); + unsigned int count = 0; + unsigned int i; + unsigned int num = 0; + unsigned int id = __sync_fetch_and_add(&worker_idx, 1); + + for (i = 0; i < 8; i++) + buf[i] = NULL; + num = rte_distributor_get_pkt(d, id, buf, buf, num); while (!quit) { - worker_stats[id].handled_packets++, count++; - rte_pktmbuf_free(pkt); - pkt = rte_distributor_get_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + for (i = 0; i < num; i++) + rte_pktmbuf_free(buf[i]); + num = rte_distributor_get_pkt(d, + id, buf, buf, num); } - worker_stats[id].handled_packets++, count++; - rte_distributor_return_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + rte_distributor_return_pkt(d, id, buf, num); return 0; } @@ -287,7 +330,8 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) unsigned i; struct rte_mbuf *bufs[BURST]; - printf("=== Sanity test with mbuf alloc/free ===\n"); + printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); + clear_packet_count(); for (i = 0; i < ((1<dist; - unsigned count = 0; - const unsigned id = __sync_fetch_and_add(&worker_idx, 1); + unsigned int count = 0; + unsigned int num = 0; + unsigned int total = 0; + unsigned int i; + unsigned int returned = 0; + const unsigned int id = __sync_fetch_and_add(&worker_idx, 1); + + num = rte_distributor_get_pkt(d, id, buf, buf, num); - pkt = rte_distributor_get_pkt(d, id, NULL); /* wait for quit single globally, or for worker zero, wait * for zero_quit */ while (!quit && !(id == 0 && zero_quit)) { - worker_stats[id].handled_packets++, count++; - rte_pktmbuf_free(pkt); - pkt = rte_distributor_get_pkt(d, id, NULL); + worker_stats[id].handled_packets += num; + count += num; + for (i = 0; i < num; i++) + rte_pktmbuf_free(buf[i]); + num = rte_distributor_get_pkt(d, + id, buf, buf, num); + total += num; } - worker_stats[id].handled_packets++, count++; - rte_distributor_return_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + returned = rte_distributor_return_pkt(d, id, buf, num); if (id == 0) { /* for worker zero, allow it to restart to pick up last packet @@ -339,13 +397,18 @@ handle_work_for_shutdown_test(void *arg) */ while (zero_quit) usleep(100); - pkt = rte_distributor_get_pkt(d, id, NULL); + + num = rte_distributor_get_pkt(d, + id, buf, buf, num); + while (!quit) { worker_stats[id].handled_packets++, count++; rte_pktmbuf_free(pkt); - pkt = rte_distributor_get_pkt(d, id, NULL); + num = rte_distributor_get_pkt(d, id, buf, buf, num); } - rte_distributor_return_pkt(d, id, pkt); + returned = rte_distributor_return_pkt(d, + id, buf, num); + printf("Num returned = %d\n", returned); } return 0; } @@ -367,17 +430,22 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, printf("=== Sanity test of worker shutdown ===\n"); clear_packet_count(); + if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { printf("line %d: Error getting mbufs from pool\n", __LINE__); return -1; } - /* now set all hash values in all buffers to zero, so all pkts go to the - * one worker thread */ + /* + * Now set all hash values in all buffers to same value so all + * pkts go to the one worker thread + */ for (i = 0; i < BURST; i++) - bufs[i]->hash.usr = 0; + bufs[i]->hash.usr = 1; rte_distributor_process(d, bufs, BURST); + rte_distributor_flush(d); + /* at this point, we will have processed some packets and have a full * backlog for the other ones at worker 0. */ @@ -388,7 +456,7 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, return -1; } for (i = 0; i < BURST; i++) - bufs[i]->hash.usr = 0; + bufs[i]->hash.usr = 1; /* get worker zero to quit */ zero_quit = 1; @@ -396,6 +464,12 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, /* flush the distributor */ rte_distributor_flush(d); + rte_delay_us(10000); + + for (i = 0; i < rte_lcore_count() - 1; i++) + printf("Worker %u handled %u packets\n", i, + worker_stats[i].handled_packets); + if (total_packet_count() != BURST * 2) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", @@ -403,10 +477,6 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, return -1; } - for (i = 0; i < rte_lcore_count() - 1; i++) - printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); - printf("Sanity test with worker shutdown passed\n\n"); return 0; } @@ -422,7 +492,7 @@ test_flush_with_worker_shutdown(struct worker_params *wp, struct rte_mbuf *bufs[BURST]; unsigned i; - printf("=== Test flush fn with worker shutdown ===\n"); + printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); clear_packet_count(); if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { @@ -446,7 +516,13 @@ test_flush_with_worker_shutdown(struct worker_params *wp, /* flush the distributor */ rte_distributor_flush(d); + rte_delay_us(10000); + zero_quit = 0; + for (i = 0; i < rte_lcore_count() - 1; i++) + printf("Worker %u handled %u packets\n", i, + worker_stats[i].handled_packets); + if (total_packet_count() != BURST) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", @@ -454,10 +530,6 @@ test_flush_with_worker_shutdown(struct worker_params *wp, return -1; } - for (i = 0; i < rte_lcore_count() - 1; i++) - printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); - printf("Flush test with worker shutdown passed\n\n"); return 0; } @@ -469,7 +541,9 @@ int test_error_distributor_create_name(void) char *name = NULL; d = rte_distributor_create(name, rte_socket_id(), - rte_lcore_count() - 1); + rte_lcore_count() - 1, + RTE_DIST_ALG_BURST); + if (d != NULL || rte_errno != EINVAL) { printf("ERROR: No error on create() with NULL name param\n"); return -1; @@ -483,8 +557,10 @@ static int test_error_distributor_create_numworkers(void) { struct rte_distributor *d = NULL; + d = rte_distributor_create("test_numworkers", rte_socket_id(), - RTE_MAX_LCORE + 10); + RTE_MAX_LCORE + 10, + RTE_DIST_ALG_BURST); if (d != NULL || rte_errno != EINVAL) { printf("ERROR: No error on create() with num_workers > MAX\n"); return -1; @@ -530,10 +606,11 @@ test_distributor(void) } if (d == NULL) { - d = rte_distributor_create("Test_distributor", rte_socket_id(), - rte_lcore_count() - 1); + d = rte_distributor_create("Test_dist_burst", rte_socket_id(), + rte_lcore_count() - 1, + RTE_DIST_ALG_BURST); if (d == NULL) { - printf("Error creating distributor\n"); + printf("Error creating burst distributor\n"); return -1; } } else { @@ -553,7 +630,7 @@ test_distributor(void) } worker_params.dist = d; - sprintf(worker_params.name, "single"); + sprintf(worker_params.name, "burst"); rte_eal_mp_remote_launch(handle_work, &worker_params, SKIP_MASTER); if (sanity_test(&worker_params, p) < 0) diff --git a/test/test/test_distributor_perf.c b/test/test/test_distributor_perf.c index 7947fe9b16..1dd326bccc 100644 --- a/test/test/test_distributor_perf.c +++ b/test/test/test_distributor_perf.c @@ -129,18 +129,25 @@ clear_packet_count(void) static int handle_work(void *arg) { - struct rte_mbuf *pkt = NULL; struct rte_distributor *d = arg; - unsigned count = 0; - unsigned id = __sync_fetch_and_add(&worker_idx, 1); + unsigned int count = 0; + unsigned int num = 0; + int i; + unsigned int id = __sync_fetch_and_add(&worker_idx, 1); + struct rte_mbuf *buf[8] __rte_cache_aligned; - pkt = rte_distributor_get_pkt(d, id, NULL); + for (i = 0; i < 8; i++) + buf[i] = NULL; + + num = rte_distributor_get_pkt(d, id, buf, buf, num); while (!quit) { - worker_stats[id].handled_packets++, count++; - pkt = rte_distributor_get_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + num = rte_distributor_get_pkt(d, id, buf, buf, num); } - worker_stats[id].handled_packets++, count++; - rte_distributor_return_pkt(d, id, pkt); + worker_stats[id].handled_packets += num; + count += num; + rte_distributor_return_pkt(d, id, buf, num); return 0; } @@ -228,7 +235,8 @@ test_distributor_perf(void) if (d == NULL) { d = rte_distributor_create("Test_perf", rte_socket_id(), - rte_lcore_count() - 1); + rte_lcore_count() - 1, + RTE_DIST_ALG_SINGLE); if (d == NULL) { printf("Error creating distributor\n"); return -1;