crypto/aesni_mb: support CPU crypto
[dpdk.git] / drivers / event / dsw / dsw_event.c
index 4a3af8e..8323903 100644 (file)
@@ -4,9 +4,16 @@
 
 #include "dsw_evdev.h"
 
+#ifdef DSW_SORT_DEQUEUED
+#include "dsw_sort.h"
+#endif
+
 #include <stdbool.h>
+#include <string.h>
 
 #include <rte_atomic.h>
+#include <rte_cycles.h>
+#include <rte_memcpy.h>
 #include <rte_random.h>
 
 static bool
@@ -75,6 +82,462 @@ dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
        }
 }
 
+static void
+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
+                      uint16_t num_forward, uint16_t num_release)
+{
+       port->new_enqueued += num_new;
+       port->forward_enqueued += num_forward;
+       port->release_enqueued += num_release;
+}
+
+static void
+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+       source_port->queue_enqueued[queue_id]++;
+}
+
+static void
+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
+{
+       port->dequeued += num;
+}
+
+static void
+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+       source_port->queue_dequeued[queue_id]++;
+}
+
+static void
+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
+{
+       if (dequeued > 0 && port->busy_start == 0)
+               /* work period begins */
+               port->busy_start = rte_get_timer_cycles();
+       else if (dequeued == 0 && port->busy_start > 0) {
+               /* work period ends */
+               uint64_t work_period =
+                       rte_get_timer_cycles() - port->busy_start;
+               port->busy_cycles += work_period;
+               port->busy_start = 0;
+       }
+}
+
+static int16_t
+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
+{
+       uint64_t passed = now - port->measurement_start;
+       uint64_t busy_cycles = port->busy_cycles;
+
+       if (port->busy_start > 0) {
+               busy_cycles += (now - port->busy_start);
+               port->busy_start = now;
+       }
+
+       int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
+
+       port->measurement_start = now;
+       port->busy_cycles = 0;
+
+       port->total_busy_cycles += busy_cycles;
+
+       return load;
+}
+
+static void
+dsw_port_load_update(struct dsw_port *port, uint64_t now)
+{
+       int16_t old_load;
+       int16_t period_load;
+       int16_t new_load;
+
+       old_load = rte_atomic16_read(&port->load);
+
+       period_load = dsw_port_load_close_period(port, now);
+
+       new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
+               (DSW_OLD_LOAD_WEIGHT+1);
+
+       rte_atomic16_set(&port->load, new_load);
+
+       /* The load of the recently immigrated flows should hopefully
+        * be reflected the load estimate by now.
+        */
+       rte_atomic32_set(&port->immigration_load, 0);
+}
+
+static void
+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
+{
+       if (now < port->next_load_update)
+               return;
+
+       port->next_load_update = now + port->load_update_interval;
+
+       dsw_port_load_update(port, now);
+}
+
+static void
+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+       /* there's always room on the ring */
+       while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
+               rte_pause();
+}
+
+static int
+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+       return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
+}
+
+static void
+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
+                      uint8_t type, struct dsw_queue_flow *qfs,
+                      uint8_t qfs_len)
+{
+       uint16_t port_id;
+       struct dsw_ctl_msg msg = {
+               .type = type,
+               .originating_port_id = source_port->id,
+               .qfs_len = qfs_len
+       };
+
+       memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
+
+       for (port_id = 0; port_id < dsw->num_ports; port_id++)
+               if (port_id != source_port->id)
+                       dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
+}
+
+static __rte_always_inline bool
+dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
+                        uint8_t queue_id, uint16_t flow_hash)
+{
+       uint16_t i;
+
+       for (i = 0; i < qfs_len; i++)
+               if (qfs[i].queue_id == queue_id &&
+                   qfs[i].flow_hash == flow_hash)
+                       return true;
+
+       return false;
+}
+
+static __rte_always_inline bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+                       uint16_t flow_hash)
+{
+       return dsw_is_queue_flow_in_ary(port->paused_flows,
+                                       port->paused_flows_len,
+                                       queue_id, flow_hash);
+}
+
+static void
+dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
+                         uint8_t qfs_len)
+{
+       uint8_t i;
+
+       for (i = 0; i < qfs_len; i++) {
+               struct dsw_queue_flow *qf = &qfs[i];
+
+               DSW_LOG_DP_PORT(DEBUG, port->id,
+                               "Pausing queue_id %d flow_hash %d.\n",
+                               qf->queue_id, qf->flow_hash);
+
+               port->paused_flows[port->paused_flows_len] = *qf;
+               port->paused_flows_len++;
+       };
+}
+
+static void
+dsw_port_remove_paused_flow(struct dsw_port *port,
+                           struct dsw_queue_flow *target_qf)
+{
+       uint16_t i;
+
+       for (i = 0; i < port->paused_flows_len; i++) {
+               struct dsw_queue_flow *qf = &port->paused_flows[i];
+
+               if (qf->queue_id == target_qf->queue_id &&
+                   qf->flow_hash == target_qf->flow_hash) {
+                       uint16_t last_idx = port->paused_flows_len-1;
+                       if (i != last_idx)
+                               port->paused_flows[i] =
+                                       port->paused_flows[last_idx];
+                       port->paused_flows_len--;
+                       break;
+               }
+       }
+}
+
+static void
+dsw_port_remove_paused_flows(struct dsw_port *port,
+                            struct dsw_queue_flow *qfs, uint8_t qfs_len)
+{
+       uint8_t i;
+
+       for (i = 0; i < qfs_len; i++)
+               dsw_port_remove_paused_flow(port, &qfs[i]);
+
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+                           uint8_t originating_port_id,
+                           struct dsw_queue_flow *paused_qfs,
+                           uint8_t qfs_len)
+{
+       struct dsw_ctl_msg cfm = {
+               .type = DSW_CTL_CFM,
+               .originating_port_id = port->id
+       };
+
+       /* There might be already-scheduled events belonging to the
+        * paused flow in the output buffers.
+        */
+       dsw_port_flush_out_buffers(dsw, port);
+
+       dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
+
+       /* Make sure any stores to the original port's in_ring is seen
+        * before the ctl message.
+        */
+       rte_smp_wmb();
+
+       dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+}
+
+struct dsw_queue_flow_burst {
+       struct dsw_queue_flow queue_flow;
+       uint16_t count;
+};
+
+#define DSW_QF_TO_INT(_qf)                                     \
+       ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
+
+static inline int
+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
+{
+       const struct dsw_queue_flow *qf_a = v_qf_a;
+       const struct dsw_queue_flow *qf_b = v_qf_b;
+
+       return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
+}
+
+static uint16_t
+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
+                      struct dsw_queue_flow_burst *bursts)
+{
+       uint16_t i;
+       struct dsw_queue_flow_burst *current_burst = NULL;
+       uint16_t num_bursts = 0;
+
+       /* We don't need the stable property, and the list is likely
+        * large enough for qsort() to outperform dsw_stable_sort(),
+        * so we use qsort() here.
+        */
+       qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
+
+       /* arrange the (now-consecutive) events into bursts */
+       for (i = 0; i < qfs_len; i++) {
+               if (i == 0 ||
+                   dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
+                       current_burst = &bursts[num_bursts];
+                       current_burst->queue_flow = qfs[i];
+                       current_burst->count = 0;
+                       num_bursts++;
+               }
+               current_burst->count++;
+       }
+
+       return num_bursts;
+}
+
+static bool
+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
+                       int16_t load_limit)
+{
+       bool below_limit = false;
+       uint16_t i;
+
+       for (i = 0; i < dsw->num_ports; i++) {
+               int16_t measured_load = rte_atomic16_read(&dsw->ports[i].load);
+               int32_t immigration_load =
+                       rte_atomic32_read(&dsw->ports[i].immigration_load);
+               int32_t load = measured_load + immigration_load;
+
+               load = RTE_MIN(load, DSW_MAX_LOAD);
+
+               if (load < load_limit)
+                       below_limit = true;
+               port_loads[i] = load;
+       }
+       return below_limit;
+}
+
+static int16_t
+dsw_flow_load(uint16_t num_events, int16_t port_load)
+{
+       return ((int32_t)port_load * (int32_t)num_events) /
+               DSW_MAX_EVENTS_RECORDED;
+}
+
+static int16_t
+dsw_evaluate_migration(int16_t source_load, int16_t target_load,
+                      int16_t flow_load)
+{
+       int32_t res_target_load;
+       int32_t imbalance;
+
+       if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
+               return -1;
+
+       imbalance = source_load - target_load;
+
+       if (imbalance < DSW_REBALANCE_THRESHOLD)
+               return -1;
+
+       res_target_load = target_load + flow_load;
+
+       /* If the estimated load of the target port will be higher
+        * than the source port's load, it doesn't make sense to move
+        * the flow.
+        */
+       if (res_target_load > source_load)
+               return -1;
+
+       /* The more idle the target will be, the better. This will
+        * make migration prefer moving smaller flows, and flows to
+        * lightly loaded ports.
+        */
+       return DSW_MAX_LOAD - res_target_load;
+}
+
+static bool
+dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
+{
+       struct dsw_queue *queue = &dsw->queues[queue_id];
+       uint16_t i;
+
+       for (i = 0; i < queue->num_serving_ports; i++)
+               if (queue->serving_ports[i] == port_id)
+                       return true;
+
+       return false;
+}
+
+static bool
+dsw_select_emigration_target(struct dsw_evdev *dsw,
+                           struct dsw_queue_flow_burst *bursts,
+                           uint16_t num_bursts, uint8_t source_port_id,
+                           int16_t *port_loads, uint16_t num_ports,
+                           uint8_t *target_port_ids,
+                           struct dsw_queue_flow *target_qfs,
+                           uint8_t *targets_len)
+{
+       int16_t source_port_load = port_loads[source_port_id];
+       struct dsw_queue_flow *candidate_qf = NULL;
+       uint8_t candidate_port_id = 0;
+       int16_t candidate_weight = -1;
+       int16_t candidate_flow_load = -1;
+       uint16_t i;
+
+       if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
+               return false;
+
+       for (i = 0; i < num_bursts; i++) {
+               struct dsw_queue_flow_burst *burst = &bursts[i];
+               struct dsw_queue_flow *qf = &burst->queue_flow;
+               int16_t flow_load;
+               uint16_t port_id;
+
+               if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
+                                            qf->queue_id, qf->flow_hash))
+                       continue;
+
+               flow_load = dsw_flow_load(burst->count, source_port_load);
+
+               for (port_id = 0; port_id < num_ports; port_id++) {
+                       int16_t weight;
+
+                       if (port_id == source_port_id)
+                               continue;
+
+                       if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
+                               continue;
+
+                       weight = dsw_evaluate_migration(source_port_load,
+                                                       port_loads[port_id],
+                                                       flow_load);
+
+                       if (weight > candidate_weight) {
+                               candidate_qf = qf;
+                               candidate_port_id = port_id;
+                               candidate_weight = weight;
+                               candidate_flow_load = flow_load;
+                       }
+               }
+       }
+
+       if (candidate_weight < 0)
+               return false;
+
+       DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
+                       "flow_hash %d (with flow load %d) for migration "
+                       "to port %d.\n", candidate_qf->queue_id,
+                       candidate_qf->flow_hash,
+                       DSW_LOAD_TO_PERCENT(candidate_flow_load),
+                       candidate_port_id);
+
+       port_loads[candidate_port_id] += candidate_flow_load;
+       port_loads[source_port_id] -= candidate_flow_load;
+
+       target_port_ids[*targets_len] = candidate_port_id;
+       target_qfs[*targets_len] = *candidate_qf;
+       (*targets_len)++;
+
+       rte_atomic32_add(&dsw->ports[candidate_port_id].immigration_load,
+                        candidate_flow_load);
+
+       return true;
+}
+
+static void
+dsw_select_emigration_targets(struct dsw_evdev *dsw,
+                             struct dsw_port *source_port,
+                             struct dsw_queue_flow_burst *bursts,
+                             uint16_t num_bursts, int16_t *port_loads)
+{
+       struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
+       uint8_t *target_port_ids = source_port->emigration_target_port_ids;
+       uint8_t *targets_len = &source_port->emigration_targets_len;
+       uint16_t i;
+
+       for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
+               bool found;
+
+               found = dsw_select_emigration_target(dsw, bursts, num_bursts,
+                                                    source_port->id,
+                                                    port_loads, dsw->num_ports,
+                                                    target_port_ids,
+                                                    target_qfs,
+                                                    targets_len);
+               if (!found)
+                       break;
+       }
+
+       if (*targets_len == 0)
+               DSW_LOG_DP_PORT(DEBUG, source_port->id,
+                               "For the %d flows considered, no target port "
+                               "was found.\n", num_bursts);
+}
+
 static uint8_t
 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
 {
@@ -132,6 +595,14 @@ dsw_port_get_parallel_flow_id(struct dsw_port *port)
        return flow_id;
 }
 
+static void
+dsw_port_buffer_paused(struct dsw_port *port,
+                      const struct rte_event *paused_event)
+{
+       port->paused_events[port->paused_events_len] = *paused_event;
+       port->paused_events_len++;
+}
+
 static void
 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
                           uint8_t dest_port_id, const struct rte_event *event)
@@ -191,11 +662,475 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
 
        flow_hash = dsw_flow_id_hash(event->flow_id);
 
+       if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
+                                            flow_hash))) {
+               dsw_port_buffer_paused(source_port, event);
+               return;
+       }
+
        dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
 
        dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
 }
 
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+                            struct dsw_port *source_port,
+                            const struct dsw_queue_flow *qf)
+{
+       uint16_t paused_events_len = source_port->paused_events_len;
+       struct rte_event paused_events[paused_events_len];
+       uint8_t dest_port_id;
+       uint16_t i;
+
+       if (paused_events_len == 0)
+               return;
+
+       if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
+               return;
+
+       rte_memcpy(paused_events, source_port->paused_events,
+                  paused_events_len * sizeof(struct rte_event));
+
+       source_port->paused_events_len = 0;
+
+       dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
+
+       for (i = 0; i < paused_events_len; i++) {
+               struct rte_event *event = &paused_events[i];
+               uint16_t flow_hash;
+
+               flow_hash = dsw_flow_id_hash(event->flow_id);
+
+               if (event->queue_id == qf->queue_id &&
+                   flow_hash == qf->flow_hash)
+                       dsw_port_buffer_non_paused(dsw, source_port,
+                                                  dest_port_id, event);
+               else
+                       dsw_port_buffer_paused(source_port, event);
+       }
+}
+
+static void
+dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
+{
+       uint64_t flow_migration_latency;
+
+       flow_migration_latency =
+               (rte_get_timer_cycles() - port->emigration_start);
+       port->emigration_latency += (flow_migration_latency * finished);
+       port->emigrations += finished;
+}
+
+static void
+dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
+                       uint8_t schedule_type)
+{
+       uint8_t i;
+       struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
+       uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
+       uint8_t left_qfs_len = 0;
+       uint8_t finished;
+
+       for (i = 0; i < port->emigration_targets_len; i++) {
+               struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
+               uint8_t queue_id = qf->queue_id;
+               uint8_t queue_schedule_type =
+                       dsw->queues[queue_id].schedule_type;
+               uint16_t flow_hash = qf->flow_hash;
+
+               if (queue_schedule_type != schedule_type) {
+                       left_port_ids[left_qfs_len] =
+                               port->emigration_target_port_ids[i];
+                       left_qfs[left_qfs_len] = *qf;
+                       left_qfs_len++;
+                       continue;
+               }
+
+               DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
+                               "queue_id %d flow_hash %d.\n", queue_id,
+                               flow_hash);
+
+               if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
+                       dsw_port_remove_paused_flow(port, qf);
+                       dsw_port_flush_paused_events(dsw, port, qf);
+               }
+       }
+
+       finished = port->emigration_targets_len - left_qfs_len;
+
+       if (finished > 0)
+               dsw_port_emigration_stats(port, finished);
+
+       for (i = 0; i < left_qfs_len; i++) {
+               port->emigration_target_port_ids[i] = left_port_ids[i];
+               port->emigration_target_qfs[i] = left_qfs[i];
+       }
+       port->emigration_targets_len = left_qfs_len;
+
+       if (port->emigration_targets_len == 0) {
+               port->migration_state = DSW_MIGRATION_STATE_IDLE;
+               port->seen_events_len = 0;
+       }
+}
+
+static void
+dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
+                            struct dsw_port *source_port)
+{
+       uint8_t i;
+
+       for (i = 0; i < source_port->emigration_targets_len; i++) {
+               struct dsw_queue_flow *qf =
+                       &source_port->emigration_target_qfs[i];
+               uint8_t queue_id = qf->queue_id;
+
+               if (dsw->queues[queue_id].schedule_type ==
+                   RTE_SCHED_TYPE_PARALLEL) {
+                       uint8_t dest_port_id =
+                               source_port->emigration_target_port_ids[i];
+                       uint16_t flow_hash = qf->flow_hash;
+
+                       /* Single byte-sized stores are always atomic. */
+                       dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+                               dest_port_id;
+               }
+       }
+
+       rte_smp_wmb();
+
+       dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
+}
+
+static void
+dsw_port_consider_emigration(struct dsw_evdev *dsw,
+                            struct dsw_port *source_port,
+                            uint64_t now)
+{
+       bool any_port_below_limit;
+       struct dsw_queue_flow *seen_events = source_port->seen_events;
+       uint16_t seen_events_len = source_port->seen_events_len;
+       struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
+       uint16_t num_bursts;
+       int16_t source_port_load;
+       int16_t port_loads[dsw->num_ports];
+
+       if (now < source_port->next_emigration)
+               return;
+
+       if (dsw->num_ports == 1)
+               return;
+
+       DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
+
+       /* Randomize interval to avoid having all threads considering
+        * emigration at the same in point in time, which might lead
+        * to all choosing the same target port.
+        */
+       source_port->next_emigration = now +
+               source_port->migration_interval / 2 +
+               rte_rand() % source_port->migration_interval;
+
+       if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+               DSW_LOG_DP_PORT(DEBUG, source_port->id,
+                               "Emigration already in progress.\n");
+               return;
+       }
+
+       /* For simplicity, avoid migration in the unlikely case there
+        * is still events to consume in the in_buffer (from the last
+        * emigration).
+        */
+       if (source_port->in_buffer_len > 0) {
+               DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+                               "events in the input buffer.\n");
+               return;
+       }
+
+       source_port_load = rte_atomic16_read(&source_port->load);
+       if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
+               DSW_LOG_DP_PORT(DEBUG, source_port->id,
+                     "Load %d is below threshold level %d.\n",
+                     DSW_LOAD_TO_PERCENT(source_port_load),
+                     DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+               return;
+       }
+
+       /* Avoid starting any expensive operations (sorting etc), in
+        * case of a scenario with all ports above the load limit.
+        */
+       any_port_below_limit =
+               dsw_retrieve_port_loads(dsw, port_loads,
+                                       DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
+       if (!any_port_below_limit) {
+               DSW_LOG_DP_PORT(DEBUG, source_port->id,
+                               "Candidate target ports are all too highly "
+                               "loaded.\n");
+               return;
+       }
+
+       num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
+                                           bursts);
+
+       /* For non-big-little systems, there's no point in moving the
+        * only (known) flow.
+        */
+       if (num_bursts < 2) {
+               DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
+                               "queue_id %d flow_hash %d has been seen.\n",
+                               bursts[0].queue_flow.queue_id,
+                               bursts[0].queue_flow.flow_hash);
+               return;
+       }
+
+       dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
+                                     port_loads);
+
+       if (source_port->emigration_targets_len == 0)
+               return;
+
+       source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+       source_port->emigration_start = rte_get_timer_cycles();
+
+       /* No need to go through the whole pause procedure for
+        * parallel queues, since atomic/ordered semantics need not to
+        * be maintained.
+        */
+       dsw_port_move_parallel_flows(dsw, source_port);
+
+       /* All flows were on PARALLEL queues. */
+       if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
+               return;
+
+       /* There might be 'loopback' events already scheduled in the
+        * output buffers.
+        */
+       dsw_port_flush_out_buffers(dsw, source_port);
+
+       dsw_port_add_paused_flows(source_port,
+                                 source_port->emigration_target_qfs,
+                                 source_port->emigration_targets_len);
+
+       dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+                              source_port->emigration_target_qfs,
+                              source_port->emigration_targets_len);
+       source_port->cfm_cnt = 0;
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+                            struct dsw_port *source_port,
+                            const struct dsw_queue_flow *qf);
+
+static void
+dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+                             uint8_t originating_port_id,
+                             struct dsw_queue_flow *paused_qfs,
+                             uint8_t qfs_len)
+{
+       uint16_t i;
+       struct dsw_ctl_msg cfm = {
+               .type = DSW_CTL_CFM,
+               .originating_port_id = port->id
+       };
+
+       dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
+
+       rte_smp_rmb();
+
+       dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+
+       for (i = 0; i < qfs_len; i++) {
+               struct dsw_queue_flow *qf = &paused_qfs[i];
+
+               if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
+                       port->immigrations++;
+
+               dsw_port_flush_paused_events(dsw, port, qf);
+       }
+}
+
+#define FORWARD_BURST_SIZE (32)
+
+static void
+dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
+                               struct rte_event_ring *dest_ring,
+                               uint8_t queue_id,
+                               uint16_t flow_hash)
+{
+       uint16_t events_left;
+
+       /* Control ring message should been seen before the ring count
+        * is read on the port's in_ring.
+        */
+       rte_smp_rmb();
+
+       events_left = rte_event_ring_count(source_port->in_ring);
+
+       while (events_left > 0) {
+               uint16_t in_burst_size =
+                       RTE_MIN(FORWARD_BURST_SIZE, events_left);
+               struct rte_event in_burst[in_burst_size];
+               uint16_t in_len;
+               uint16_t i;
+
+               in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
+                                                     in_burst,
+                                                     in_burst_size, NULL);
+               /* No need to care about bursting forwarded events (to
+                * the destination port's in_ring), since migration
+                * doesn't happen very often, and also the majority of
+                * the dequeued events will likely *not* be forwarded.
+                */
+               for (i = 0; i < in_len; i++) {
+                       struct rte_event *e = &in_burst[i];
+                       if (e->queue_id == queue_id &&
+                           dsw_flow_id_hash(e->flow_id) == flow_hash) {
+                               while (rte_event_ring_enqueue_burst(dest_ring,
+                                                                   e, 1,
+                                                                   NULL) != 1)
+                                       rte_pause();
+                       } else {
+                               uint16_t last_idx = source_port->in_buffer_len;
+                               source_port->in_buffer[last_idx] = *e;
+                               source_port->in_buffer_len++;
+                       }
+               }
+
+               events_left -= in_len;
+       }
+}
+
+static void
+dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
+                              struct dsw_port *source_port)
+{
+       uint8_t i;
+
+       dsw_port_flush_out_buffers(dsw, source_port);
+
+       rte_smp_wmb();
+
+       for (i = 0; i < source_port->emigration_targets_len; i++) {
+               struct dsw_queue_flow *qf =
+                       &source_port->emigration_target_qfs[i];
+               uint8_t dest_port_id =
+                       source_port->emigration_target_port_ids[i];
+               struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+               dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
+                       dest_port_id;
+
+               dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
+                                               qf->queue_id, qf->flow_hash);
+       }
+
+       /* Flow table update and migration destination port's enqueues
+        * must be seen before the control message.
+        */
+       rte_smp_wmb();
+
+       dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
+                              source_port->emigration_target_qfs,
+                              source_port->emigration_targets_len);
+       source_port->cfm_cnt = 0;
+       source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
+}
+
+static void
+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+       port->cfm_cnt++;
+
+       if (port->cfm_cnt == (dsw->num_ports-1)) {
+               switch (port->migration_state) {
+               case DSW_MIGRATION_STATE_PAUSING:
+                       DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
+                                       "migration state.\n");
+                       port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+                       break;
+               case DSW_MIGRATION_STATE_UNPAUSING:
+                       dsw_port_end_emigration(dsw, port,
+                                               RTE_SCHED_TYPE_ATOMIC);
+                       break;
+               default:
+                       RTE_ASSERT(0);
+                       break;
+               }
+       }
+}
+
+static void
+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+       struct dsw_ctl_msg msg;
+
+       if (dsw_port_ctl_dequeue(port, &msg) == 0) {
+               switch (msg.type) {
+               case DSW_CTL_PAUS_REQ:
+                       dsw_port_handle_pause_flows(dsw, port,
+                                                   msg.originating_port_id,
+                                                   msg.qfs, msg.qfs_len);
+                       break;
+               case DSW_CTL_UNPAUS_REQ:
+                       dsw_port_handle_unpause_flows(dsw, port,
+                                                     msg.originating_port_id,
+                                                     msg.qfs, msg.qfs_len);
+                       break;
+               case DSW_CTL_CFM:
+                       dsw_port_handle_confirm(dsw, port);
+                       break;
+               }
+       }
+}
+
+static void
+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
+{
+       /* To pull the control ring reasonbly often on busy ports,
+        * each dequeued/enqueued event is considered an 'op' too.
+        */
+       port->ops_since_bg_task += (num_events+1);
+}
+
+static void
+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+       if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
+                    port->pending_releases == 0))
+               dsw_port_move_emigrating_flows(dsw, port);
+
+       /* Polling the control ring is relatively inexpensive, and
+        * polling it often helps bringing down migration latency, so
+        * do this for every iteration.
+        */
+       dsw_port_ctl_process(dsw, port);
+
+       /* To avoid considering migration and flushing output buffers
+        * on every dequeue/enqueue call, the scheduler only performs
+        * such 'background' tasks every nth
+        * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
+        */
+       if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
+               uint64_t now;
+
+               now = rte_get_timer_cycles();
+
+               port->last_bg = now;
+
+               /* Logic to avoid having events linger in the output
+                * buffer too long.
+                */
+               dsw_port_flush_out_buffers(dsw, port);
+
+               dsw_port_consider_load_update(port, now);
+
+               dsw_port_consider_emigration(dsw, port, now);
+
+               port->ops_since_bg_task = 0;
+       }
+}
+
 static void
 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
 {
@@ -212,12 +1147,12 @@ dsw_event_enqueue(void *port, const struct rte_event *ev)
 }
 
 static __rte_always_inline uint16_t
-dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
+dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
+                               const struct rte_event events[],
                                uint16_t events_len, bool op_types_known,
                                uint16_t num_new, uint16_t num_release,
                                uint16_t num_non_release)
 {
-       struct dsw_port *source_port = port;
        struct dsw_evdev *dsw = source_port->dsw;
        bool enough_credits;
        uint16_t i;
@@ -225,6 +1160,8 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
        DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
                        "events to port %d.\n", events_len, source_port->id);
 
+       dsw_port_bg_process(dsw, source_port);
+
        /* XXX: For performance (=ring efficiency) reasons, the
         * scheduler relies on internal non-ring buffers instead of
         * immediately sending the event to the destination ring. For
@@ -238,12 +1175,12 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
         * considered.
         */
        if (unlikely(events_len == 0)) {
+               dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
                dsw_port_flush_out_buffers(dsw, source_port);
                return 0;
        }
 
-       if (unlikely(events_len > source_port->enqueue_depth))
-               events_len = source_port->enqueue_depth;
+       dsw_port_note_op(source_port, events_len);
 
        if (!op_types_known)
                for (i = 0; i < events_len; i++) {
@@ -276,12 +1213,16 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
 
        source_port->pending_releases -= num_release;
 
+       dsw_port_enqueue_stats(source_port, num_new,
+                              num_non_release-num_new, num_release);
+
        for (i = 0; i < events_len; i++) {
                const struct rte_event *event = &events[i];
 
                if (likely(num_release == 0 ||
                           event->op != RTE_EVENT_OP_RELEASE))
                        dsw_port_buffer_event(dsw, source_port, event);
+               dsw_port_queue_enqueue_stats(source_port, event->queue_id);
        }
 
        DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
@@ -294,24 +1235,41 @@ uint16_t
 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
                        uint16_t events_len)
 {
-       return dsw_event_enqueue_burst_generic(port, events, events_len, false,
-                                              0, 0, 0);
+       struct dsw_port *source_port = port;
+
+       if (unlikely(events_len > source_port->enqueue_depth))
+               events_len = source_port->enqueue_depth;
+
+       return dsw_event_enqueue_burst_generic(source_port, events,
+                                              events_len, false, 0, 0, 0);
 }
 
 uint16_t
 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
                            uint16_t events_len)
 {
-       return dsw_event_enqueue_burst_generic(port, events, events_len, true,
-                                              events_len, 0, events_len);
+       struct dsw_port *source_port = port;
+
+       if (unlikely(events_len > source_port->enqueue_depth))
+               events_len = source_port->enqueue_depth;
+
+       return dsw_event_enqueue_burst_generic(source_port, events,
+                                              events_len, true, events_len,
+                                              0, events_len);
 }
 
 uint16_t
 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
                                uint16_t events_len)
 {
-       return dsw_event_enqueue_burst_generic(port, events, events_len, true,
-                                              0, 0, events_len);
+       struct dsw_port *source_port = port;
+
+       if (unlikely(events_len > source_port->enqueue_depth))
+               events_len = source_port->enqueue_depth;
+
+       return dsw_event_enqueue_burst_generic(source_port, events,
+                                              events_len, true, 0, 0,
+                                              events_len);
 }
 
 uint16_t
@@ -320,10 +1278,66 @@ dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
        return dsw_event_dequeue_burst(port, events, 1, wait);
 }
 
+static void
+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
+                           uint16_t num)
+{
+       uint16_t i;
+
+       dsw_port_dequeue_stats(port, num);
+
+       for (i = 0; i < num; i++) {
+               uint16_t l_idx = port->seen_events_idx;
+               struct dsw_queue_flow *qf = &port->seen_events[l_idx];
+               struct rte_event *event = &events[i];
+               qf->queue_id = event->queue_id;
+               qf->flow_hash = dsw_flow_id_hash(event->flow_id);
+
+               port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+
+               dsw_port_queue_dequeued_stats(port, event->queue_id);
+       }
+
+       if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
+               port->seen_events_len =
+                       RTE_MIN(port->seen_events_len + num,
+                               DSW_MAX_EVENTS_RECORDED);
+}
+
+#ifdef DSW_SORT_DEQUEUED
+
+#define DSW_EVENT_TO_INT(_event)                               \
+       ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
+
+static inline int
+dsw_cmp_event(const void *v_event_a, const void *v_event_b)
+{
+       const struct rte_event *event_a = v_event_a;
+       const struct rte_event *event_b = v_event_b;
+
+       return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
+}
+#endif
+
 static uint16_t
 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
                       uint16_t num)
 {
+       if (unlikely(port->in_buffer_len > 0)) {
+               uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+
+               rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
+                          dequeued * sizeof(struct rte_event));
+
+               port->in_buffer_start += dequeued;
+               port->in_buffer_len -= dequeued;
+
+               if (port->in_buffer_len == 0)
+                       port->in_buffer_start = 0;
+
+               return dequeued;
+       }
+
        return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
 }
 
@@ -337,6 +1351,8 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
 
        source_port->pending_releases = 0;
 
+       dsw_port_bg_process(dsw, source_port);
+
        if (unlikely(num > source_port->dequeue_depth))
                num = source_port->dequeue_depth;
 
@@ -344,16 +1360,33 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
 
        source_port->pending_releases = dequeued;
 
+       dsw_port_load_record(source_port, dequeued);
+
+       dsw_port_note_op(source_port, dequeued);
+
        if (dequeued > 0) {
                DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
                                dequeued);
 
                dsw_port_return_credits(dsw, source_port, dequeued);
-       }
-       /* XXX: Assuming the port can't produce any more work,
-        *      consider flushing the output buffer, on dequeued ==
-        *      0.
-        */
+
+               /* One potential optimization one might think of is to
+                * add a migration state (prior to 'pausing'), and
+                * only record seen events when the port is in this
+                * state (and transit to 'pausing' when enough events
+                * have been gathered). However, that schema doesn't
+                * seem to improve performance.
+                */
+               dsw_port_record_seen_events(port, events, dequeued);
+       } else /* Zero-size dequeue means a likely idle port, and thus
+               * we can afford trading some efficiency for a slightly
+               * reduced event wall-time latency.
+               */
+               dsw_port_flush_out_buffers(dsw, port);
+
+#ifdef DSW_SORT_DEQUEUED
+       dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
+#endif
 
        return dequeued;
 }