#include "dsw_evdev.h"
+#ifdef DSW_SORT_DEQUEUED
+#include "dsw_sort.h"
+#endif
+
#include <stdbool.h>
#include <string.h>
}
}
+static void
+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
+ uint16_t num_forward, uint16_t num_release)
+{
+ port->new_enqueued += num_new;
+ port->forward_enqueued += num_forward;
+ port->release_enqueued += num_release;
+}
+
+static void
+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+ source_port->queue_enqueued[queue_id]++;
+}
+
+static void
+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
+{
+ port->dequeued += num;
+}
+
+static void
+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+ source_port->queue_dequeued[queue_id]++;
+}
+
static void
dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
{
static void
dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
{
- void *raw_msg;
-
- memcpy(&raw_msg, msg, sizeof(*msg));
-
/* there's always room on the ring */
- while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
+ while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
rte_pause();
}
static int
dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
{
- void *raw_msg;
- int rc;
-
- rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
-
- if (rc == 0)
- memcpy(msg, &raw_msg, sizeof(*msg));
-
- return rc;
+ return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
}
static void
}
static __rte_always_inline uint16_t
-dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
+dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
+ const struct rte_event events[],
uint16_t events_len, bool op_types_known,
uint16_t num_new, uint16_t num_release,
uint16_t num_non_release)
{
- struct dsw_port *source_port = port;
struct dsw_evdev *dsw = source_port->dsw;
bool enough_credits;
uint16_t i;
*/
if (unlikely(events_len == 0)) {
dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
+ dsw_port_flush_out_buffers(dsw, source_port);
return 0;
}
- if (unlikely(events_len > source_port->enqueue_depth))
- events_len = source_port->enqueue_depth;
-
dsw_port_note_op(source_port, events_len);
if (!op_types_known)
source_port->pending_releases -= num_release;
+ dsw_port_enqueue_stats(source_port, num_new,
+ num_non_release-num_new, num_release);
+
for (i = 0; i < events_len; i++) {
const struct rte_event *event = &events[i];
if (likely(num_release == 0 ||
event->op != RTE_EVENT_OP_RELEASE))
dsw_port_buffer_event(dsw, source_port, event);
+ dsw_port_queue_enqueue_stats(source_port, event->queue_id);
}
DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
dsw_event_enqueue_burst(void *port, const struct rte_event events[],
uint16_t events_len)
{
- return dsw_event_enqueue_burst_generic(port, events, events_len, false,
- 0, 0, 0);
+ struct dsw_port *source_port = port;
+
+ if (unlikely(events_len > source_port->enqueue_depth))
+ events_len = source_port->enqueue_depth;
+
+ return dsw_event_enqueue_burst_generic(source_port, events,
+ events_len, false, 0, 0, 0);
}
uint16_t
dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
uint16_t events_len)
{
- return dsw_event_enqueue_burst_generic(port, events, events_len, true,
- events_len, 0, events_len);
+ struct dsw_port *source_port = port;
+
+ if (unlikely(events_len > source_port->enqueue_depth))
+ events_len = source_port->enqueue_depth;
+
+ return dsw_event_enqueue_burst_generic(source_port, events,
+ events_len, true, events_len,
+ 0, events_len);
}
uint16_t
dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
uint16_t events_len)
{
- return dsw_event_enqueue_burst_generic(port, events, events_len, true,
- 0, 0, events_len);
+ struct dsw_port *source_port = port;
+
+ if (unlikely(events_len > source_port->enqueue_depth))
+ events_len = source_port->enqueue_depth;
+
+ return dsw_event_enqueue_burst_generic(source_port, events,
+ events_len, true, 0, 0,
+ events_len);
}
uint16_t
{
uint16_t i;
+ dsw_port_dequeue_stats(port, num);
+
for (i = 0; i < num; i++) {
uint16_t l_idx = port->seen_events_idx;
struct dsw_queue_flow *qf = &port->seen_events[l_idx];
qf->flow_hash = dsw_flow_id_hash(event->flow_id);
port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+
+ dsw_port_queue_dequeued_stats(port, event->queue_id);
}
if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
DSW_MAX_EVENTS_RECORDED);
}
+#ifdef DSW_SORT_DEQUEUED
+
+#define DSW_EVENT_TO_INT(_event) \
+ ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
+
+static inline int
+dsw_cmp_event(const void *v_event_a, const void *v_event_b)
+{
+ const struct rte_event *event_a = v_event_a;
+ const struct rte_event *event_b = v_event_b;
+
+ return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
+}
+#endif
+
static uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
* 0.
*/
+#ifdef DSW_SORT_DEQUEUED
+ dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
+#endif
+
return dequeued;
}