From: Bruce Richardson Date: Thu, 30 Mar 2017 19:30:38 +0000 (+0100) Subject: event/sw: add worker core functions X-Git-Tag: spdx-start~3842 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=656af9180014ee095deacf106161c873d3063bc8;p=dpdk.git event/sw: add worker core functions add the event enqueue, dequeue and release functions to the eventdev. These also include tracking of stats for observability in the load of the scheduler. Internally in the enqueue function, the various types of enqueue operations, to forward an existing event, to send a new event, to drop a previous event, are converted to a series of flags which will be used by the scheduler code to perform the needed actions for that event. Signed-off-by: Bruce Richardson Signed-off-by: Gage Eads Signed-off-by: Harry van Haaren Acked-by: Jerin Jacob --- diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile index 49b013a97a..22b428334f 100644 --- a/drivers/event/sw/Makefile +++ b/drivers/event/sw/Makefile @@ -52,6 +52,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map # library source files SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c # export include files SYMLINK-y-include += diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index a2e1cbbfe0..bc5acc0e11 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -411,6 +411,7 @@ sw_dev_configure(const struct rte_eventdev *dev) sw->qid_count = conf->nb_event_queues; sw->port_count = conf->nb_event_ports; sw->nb_events_limit = conf->nb_events_limit; + rte_atomic32_set(&sw->inflights, 0); if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) return -ENOTSUP; @@ -552,6 +553,13 @@ sw_probe(const char *name, const char *params) return -EFAULT; } dev->dev_ops = &evdev_sw_ops; + dev->enqueue = sw_event_enqueue; + dev->enqueue_burst = sw_event_enqueue_burst; + dev->dequeue = sw_event_dequeue; + dev->dequeue_burst = sw_event_dequeue_burst; + + if (rte_eal_process_type() != RTE_PROC_PRIMARY) + return 0; sw = dev->data->dev_private; sw->data = dev->data; diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index f5515e1a62..ab372fdae8 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -55,12 +55,36 @@ #define SCHED_DEQUEUE_BURST_SIZE 32 #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */ +#define NUM_SAMPLES 64 /* how many data points use for average stats */ #define EVENTDEV_NAME_SW_PMD event_sw #define SW_PMD_NAME RTE_STR(event_sw) #define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) +enum { + QE_FLAG_VALID_SHIFT = 0, + QE_FLAG_COMPLETE_SHIFT, + QE_FLAG_NOT_EOP_SHIFT, + _QE_FLAG_COUNT +}; + +#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* for NEW FWD, FRAG */ +#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */ +#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */ + +static const uint8_t sw_qe_flag_map[] = { + QE_FLAG_VALID /* NEW Event */, + QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */, + QE_FLAG_COMPLETE /* RELEASE Event */, + + /* Values which can be used for future support for partial + * events, i.e. where one event comes back to the scheduler + * as multiple which need to be tracked together + */ + QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP, +}; + #ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG #define SW_LOG_INFO(fmt, args...) \ RTE_LOG(INFO, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ @@ -241,4 +265,12 @@ sw_pmd_priv_const(const struct rte_eventdev *eventdev) return eventdev->data->dev_private; } +uint16_t sw_event_enqueue(void *port, const struct rte_event *ev); +uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[], + uint16_t num); + +uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait); +uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, + uint64_t wait); + #endif /* _SW_EVDEV_H_ */ diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c new file mode 100644 index 0000000000..ed08778387 --- /dev/null +++ b/drivers/event/sw/sw_evdev_worker.c @@ -0,0 +1,183 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +#include "sw_evdev.h" +#include "event_ring.h" + +#define PORT_ENQUEUE_MAX_BURST_SIZE 64 + +static inline void +sw_event_release(struct sw_port *p, uint8_t index) +{ + /* + * Drops the next outstanding event in our history. Used on dequeue + * to clear any history before dequeuing more events. + */ + RTE_SET_USED(index); + + /* create drop message */ + struct rte_event ev = { + .op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE], + }; + + uint16_t free_count; + qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); + + /* each release returns one credit */ + p->outstanding_releases--; + p->inflight_credits++; +} + +uint16_t +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) +{ + int32_t i; + uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE]; + struct sw_port *p = port; + struct sw_evdev *sw = (void *)p->sw; + uint32_t sw_inflights = rte_atomic32_read(&sw->inflights); + + if (unlikely(p->inflight_max < sw_inflights)) + return 0; + + if (num > PORT_ENQUEUE_MAX_BURST_SIZE) + num = PORT_ENQUEUE_MAX_BURST_SIZE; + + if (p->inflight_credits < num) { + /* check if event enqueue brings port over max threshold */ + uint32_t credit_update_quanta = sw->credit_update_quanta; + if (sw_inflights + credit_update_quanta > sw->nb_events_limit) + return 0; + + rte_atomic32_add(&sw->inflights, credit_update_quanta); + p->inflight_credits += (credit_update_quanta); + + if (p->inflight_credits < num) + return 0; + } + + for (i = 0; i < num; i++) { + int op = ev[i].op; + int outstanding = p->outstanding_releases > 0; + const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count); + + p->inflight_credits -= (op == RTE_EVENT_OP_NEW); + p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * + outstanding; + + new_ops[i] = sw_qe_flag_map[op]; + new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); + + /* FWD and RELEASE packets will both resolve to taken (assuming + * correct usage of the API), providing very high correct + * prediction rate. + */ + if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) + p->outstanding_releases--; + /* Branch to avoid touching p->stats except error case */ + if (unlikely(invalid_qid)) + p->stats.rx_dropped++; + } + + /* returns number of events actually enqueued */ + uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i, + new_ops); + if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { + uint64_t burst_ticks = rte_get_timer_cycles() - + p->last_dequeue_ticks; + uint64_t burst_pkt_ticks = + burst_ticks / p->last_dequeue_burst_sz; + p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES; + p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES; + p->last_dequeue_ticks = 0; + } + return enq; +} + +uint16_t +sw_event_enqueue(void *port, const struct rte_event *ev) +{ + return sw_event_enqueue_burst(port, ev, 1); +} + +uint16_t +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, + uint64_t wait) +{ + RTE_SET_USED(wait); + struct sw_port *p = (void *)port; + struct sw_evdev *sw = (void *)p->sw; + struct qe_ring *ring = p->cq_worker_ring; + uint32_t credit_update_quanta = sw->credit_update_quanta; + + /* check that all previous dequeues have been released */ + if (!p->is_directed) { + uint16_t out_rels = p->outstanding_releases; + uint16_t i; + for (i = 0; i < out_rels; i++) + sw_event_release(p, i); + } + + /* returns number of events actually dequeued */ + uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num); + if (unlikely(ndeq == 0)) { + p->outstanding_releases = 0; + p->zero_polls++; + p->total_polls++; + goto end; + } + + /* only add credits for directed ports - LB ports send RELEASEs */ + p->inflight_credits += ndeq * p->is_directed; + p->outstanding_releases = ndeq; + p->last_dequeue_burst_sz = ndeq; + p->last_dequeue_ticks = rte_get_timer_cycles(); + p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; + p->total_polls++; + +end: + if (p->inflight_credits >= credit_update_quanta * 2 && + p->inflight_credits > credit_update_quanta + ndeq) { + rte_atomic32_sub(&sw->inflights, credit_update_quanta); + p->inflight_credits -= credit_update_quanta; + } + return ndeq; +} + +uint16_t +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait) +{ + return sw_event_dequeue_burst(port, ev, 1, wait); +}