X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=drivers%2Fevent%2Fdsw%2Fdsw_event.c;h=d68b71b9827b28abe644c8c106c491e4124477f5;hb=6e77151286b2f551ae30aac0f251ed067f1ec011;hp=f0347592d81f64150b5e0229b6c37d887e560206;hpb=f6257b22e76768d1eaedfff0cf9802e310d5d4b5;p=dpdk.git diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c index f0347592d8..d68b71b982 100644 --- a/drivers/event/dsw/dsw_event.c +++ b/drivers/event/dsw/dsw_event.c @@ -4,6 +4,10 @@ #include "dsw_evdev.h" +#ifdef DSW_SORT_DEQUEUED +#include "dsw_sort.h" +#endif + #include #include @@ -78,6 +82,33 @@ dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port, } } +static void +dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new, + uint16_t num_forward, uint16_t num_release) +{ + port->new_enqueued += num_new; + port->forward_enqueued += num_forward; + port->release_enqueued += num_release; +} + +static void +dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id) +{ + source_port->queue_enqueued[queue_id]++; +} + +static void +dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num) +{ + port->dequeued += num; +} + +static void +dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id) +{ + source_port->queue_dequeued[queue_id]++; +} + static void dsw_port_load_record(struct dsw_port *port, unsigned int dequeued) { @@ -145,27 +176,15 @@ dsw_port_consider_load_update(struct dsw_port *port, uint64_t now) static void dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg) { - void *raw_msg; - - memcpy(&raw_msg, msg, sizeof(*msg)); - /* there's always room on the ring */ - while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0) + while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0) rte_pause(); } static int dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg) { - void *raw_msg; - int rc; - - rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg); - - if (rc == 0) - memcpy(msg, &raw_msg, sizeof(*msg)); - - return rc; + return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg)); } static void @@ -987,12 +1006,12 @@ dsw_event_enqueue(void *port, const struct rte_event *ev) } static __rte_always_inline uint16_t -dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[], +dsw_event_enqueue_burst_generic(struct dsw_port *source_port, + const struct rte_event events[], uint16_t events_len, bool op_types_known, uint16_t num_new, uint16_t num_release, uint16_t num_non_release) { - struct dsw_port *source_port = port; struct dsw_evdev *dsw = source_port->dsw; bool enough_credits; uint16_t i; @@ -1016,12 +1035,10 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[], */ if (unlikely(events_len == 0)) { dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK); + dsw_port_flush_out_buffers(dsw, source_port); return 0; } - if (unlikely(events_len > source_port->enqueue_depth)) - events_len = source_port->enqueue_depth; - dsw_port_note_op(source_port, events_len); if (!op_types_known) @@ -1055,12 +1072,16 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[], source_port->pending_releases -= num_release; + dsw_port_enqueue_stats(source_port, num_new, + num_non_release-num_new, num_release); + for (i = 0; i < events_len; i++) { const struct rte_event *event = &events[i]; if (likely(num_release == 0 || event->op != RTE_EVENT_OP_RELEASE)) dsw_port_buffer_event(dsw, source_port, event); + dsw_port_queue_enqueue_stats(source_port, event->queue_id); } DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events " @@ -1073,24 +1094,41 @@ uint16_t dsw_event_enqueue_burst(void *port, const struct rte_event events[], uint16_t events_len) { - return dsw_event_enqueue_burst_generic(port, events, events_len, false, - 0, 0, 0); + struct dsw_port *source_port = port; + + if (unlikely(events_len > source_port->enqueue_depth)) + events_len = source_port->enqueue_depth; + + return dsw_event_enqueue_burst_generic(source_port, events, + events_len, false, 0, 0, 0); } uint16_t dsw_event_enqueue_new_burst(void *port, const struct rte_event events[], uint16_t events_len) { - return dsw_event_enqueue_burst_generic(port, events, events_len, true, - events_len, 0, events_len); + struct dsw_port *source_port = port; + + if (unlikely(events_len > source_port->enqueue_depth)) + events_len = source_port->enqueue_depth; + + return dsw_event_enqueue_burst_generic(source_port, events, + events_len, true, events_len, + 0, events_len); } uint16_t dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[], uint16_t events_len) { - return dsw_event_enqueue_burst_generic(port, events, events_len, true, - 0, 0, events_len); + struct dsw_port *source_port = port; + + if (unlikely(events_len > source_port->enqueue_depth)) + events_len = source_port->enqueue_depth; + + return dsw_event_enqueue_burst_generic(source_port, events, + events_len, true, 0, 0, + events_len); } uint16_t @@ -1105,6 +1143,8 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events, { uint16_t i; + dsw_port_dequeue_stats(port, num); + for (i = 0; i < num; i++) { uint16_t l_idx = port->seen_events_idx; struct dsw_queue_flow *qf = &port->seen_events[l_idx]; @@ -1113,6 +1153,8 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events, qf->flow_hash = dsw_flow_id_hash(event->flow_id); port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED; + + dsw_port_queue_dequeued_stats(port, event->queue_id); } if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED)) @@ -1121,6 +1163,21 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events, DSW_MAX_EVENTS_RECORDED); } +#ifdef DSW_SORT_DEQUEUED + +#define DSW_EVENT_TO_INT(_event) \ + ((int)((((_event)->queue_id)<<16)|((_event)->flow_id))) + +static inline int +dsw_cmp_event(const void *v_event_a, const void *v_event_b) +{ + const struct rte_event *event_a = v_event_a; + const struct rte_event *event_b = v_event_b; + + return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b); +} +#endif + static uint16_t dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, uint16_t num) @@ -1191,5 +1248,9 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, * 0. */ +#ifdef DSW_SORT_DEQUEUED + dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event); +#endif + return dequeued; }