*/
#define DSW_PARALLEL_FLOWS (1024)
+/* 'Background tasks' are polling the control rings for *
+ * migration-related messages, or flush the output buffer (so
+ * buffered events doesn't linger too long). Shouldn't be too low,
+ * since the system won't benefit from the 'batching' effects from
+ * the output buffer, and shouldn't be too high, since it will make
+ * buffered events linger too long in case the port goes idle.
+ */
+#define DSW_MAX_PORT_OPS_PER_BG_TASK (128)
+
/* Avoid making small 'loans' from the central in-flight event credit
* pool, to improve efficiency.
*/
*/
#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)
+#define DSW_MAX_LOAD (INT16_MAX)
+#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100))
+#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD)
+
+/* The thought behind keeping the load update interval shorter than
+ * the migration interval is that the load from newly migrated flows
+ * should 'show up' on the load measurement before new migrations are
+ * considered. This is to avoid having too many flows, from too many
+ * source ports, to be migrated too quickly to a lightly loaded port -
+ * in particular since this might cause the system to oscillate.
+ */
+#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
+#define DSW_OLD_LOAD_WEIGHT (1)
+
+#define DSW_MIGRATION_INTERVAL (1000)
+
struct dsw_port {
uint16_t id;
uint16_t next_parallel_flow_id;
+ uint16_t ops_since_bg_task;
+
+ uint64_t last_bg;
+
+ /* For port load measurement. */
+ uint64_t next_load_update;
+ uint64_t load_update_interval;
+ uint64_t measurement_start;
+ uint64_t busy_start;
+ uint64_t busy_cycles;
+ uint64_t total_busy_cycles;
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
struct rte_event_ring *in_ring __rte_cache_aligned;
+
+ /* Estimate of current port load. */
+ rte_atomic16_t load __rte_cache_aligned;
} __rte_cache_aligned;
struct dsw_queue {
#include <stdbool.h>
#include <rte_atomic.h>
+#include <rte_cycles.h>
#include <rte_random.h>
static bool
}
}
+static void
+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
+{
+ if (dequeued > 0 && port->busy_start == 0)
+ /* work period begins */
+ port->busy_start = rte_get_timer_cycles();
+ else if (dequeued == 0 && port->busy_start > 0) {
+ /* work period ends */
+ uint64_t work_period =
+ rte_get_timer_cycles() - port->busy_start;
+ port->busy_cycles += work_period;
+ port->busy_start = 0;
+ }
+}
+
+static int16_t
+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
+{
+ uint64_t passed = now - port->measurement_start;
+ uint64_t busy_cycles = port->busy_cycles;
+
+ if (port->busy_start > 0) {
+ busy_cycles += (now - port->busy_start);
+ port->busy_start = now;
+ }
+
+ int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
+
+ port->measurement_start = now;
+ port->busy_cycles = 0;
+
+ port->total_busy_cycles += busy_cycles;
+
+ return load;
+}
+
+static void
+dsw_port_load_update(struct dsw_port *port, uint64_t now)
+{
+ int16_t old_load;
+ int16_t period_load;
+ int16_t new_load;
+
+ old_load = rte_atomic16_read(&port->load);
+
+ period_load = dsw_port_load_close_period(port, now);
+
+ new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
+ (DSW_OLD_LOAD_WEIGHT+1);
+
+ rte_atomic16_set(&port->load, new_load);
+}
+
+static void
+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
+{
+ if (now < port->next_load_update)
+ return;
+
+ port->next_load_update = now + port->load_update_interval;
+
+ dsw_port_load_update(port, now);
+}
+
static uint8_t
dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
{
dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
}
+static void
+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
+{
+ /* To pull the control ring reasonbly often on busy ports,
+ * each dequeued/enqueued event is considered an 'op' too.
+ */
+ port->ops_since_bg_task += (num_events+1);
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
+ uint64_t now;
+
+ now = rte_get_timer_cycles();
+
+ port->last_bg = now;
+
+ /* Logic to avoid having events linger in the output
+ * buffer too long.
+ */
+ dsw_port_flush_out_buffers(dsw, port);
+
+ dsw_port_consider_load_update(port, now);
+
+ port->ops_since_bg_task = 0;
+ }
+}
+
static void
dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
{
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
"events to port %d.\n", events_len, source_port->id);
+ dsw_port_bg_process(dsw, source_port);
+
/* XXX: For performance (=ring efficiency) reasons, the
* scheduler relies on internal non-ring buffers instead of
* immediately sending the event to the destination ring. For
* considered.
*/
if (unlikely(events_len == 0)) {
+ dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
dsw_port_flush_out_buffers(dsw, source_port);
return 0;
}
if (unlikely(events_len > source_port->enqueue_depth))
events_len = source_port->enqueue_depth;
+ dsw_port_note_op(source_port, events_len);
+
if (!op_types_known)
for (i = 0; i < events_len; i++) {
switch (events[i].op) {
source_port->pending_releases = 0;
+ dsw_port_bg_process(dsw, source_port);
+
if (unlikely(num > source_port->dequeue_depth))
num = source_port->dequeue_depth;
source_port->pending_releases = dequeued;
+ dsw_port_load_record(source_port, dequeued);
+
+ dsw_port_note_op(source_port, dequeued);
+
if (dequeued > 0) {
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
dequeued);