event/dsw: fix gcc 4.8 false positive warning
[dpdk.git] / drivers / event / dsw / dsw_event.c
index 05abfb2..8323903 100644 (file)
@@ -160,6 +160,11 @@ dsw_port_load_update(struct dsw_port *port, uint64_t now)
                (DSW_OLD_LOAD_WEIGHT+1);
 
        rte_atomic16_set(&port->load, new_load);
+
+       /* The load of the recently immigrated flows should hopefully
+        * be reflected the load estimate by now.
+        */
+       rte_atomic32_set(&port->immigration_load, 0);
 }
 
 static void
@@ -189,58 +194,75 @@ dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
 
 static void
 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
-                      uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+                      uint8_t type, struct dsw_queue_flow *qfs,
+                      uint8_t qfs_len)
 {
        uint16_t port_id;
        struct dsw_ctl_msg msg = {
                .type = type,
                .originating_port_id = source_port->id,
-               .queue_id = queue_id,
-               .flow_hash = flow_hash
+               .qfs_len = qfs_len
        };
 
+       memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
+
        for (port_id = 0; port_id < dsw->num_ports; port_id++)
                if (port_id != source_port->id)
                        dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
 }
 
-static bool
-dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
-                       uint16_t flow_hash)
+static __rte_always_inline bool
+dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
+                        uint8_t queue_id, uint16_t flow_hash)
 {
        uint16_t i;
 
-       for (i = 0; i < port->paused_flows_len; i++) {
-               struct dsw_queue_flow *qf = &port->paused_flows[i];
-               if (qf->queue_id == queue_id &&
-                   qf->flow_hash == flow_hash)
+       for (i = 0; i < qfs_len; i++)
+               if (qfs[i].queue_id == queue_id &&
+                   qfs[i].flow_hash == flow_hash)
                        return true;
-       }
+
        return false;
 }
 
+static __rte_always_inline bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+                       uint16_t flow_hash)
+{
+       return dsw_is_queue_flow_in_ary(port->paused_flows,
+                                       port->paused_flows_len,
+                                       queue_id, flow_hash);
+}
+
 static void
-dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
-                        uint16_t paused_flow_hash)
+dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
+                         uint8_t qfs_len)
 {
-       port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
-               .queue_id = queue_id,
-               .flow_hash = paused_flow_hash
+       uint8_t i;
+
+       for (i = 0; i < qfs_len; i++) {
+               struct dsw_queue_flow *qf = &qfs[i];
+
+               DSW_LOG_DP_PORT(DEBUG, port->id,
+                               "Pausing queue_id %d flow_hash %d.\n",
+                               qf->queue_id, qf->flow_hash);
+
+               port->paused_flows[port->paused_flows_len] = *qf;
+               port->paused_flows_len++;
        };
-       port->paused_flows_len++;
 }
 
 static void
-dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
-                           uint16_t paused_flow_hash)
+dsw_port_remove_paused_flow(struct dsw_port *port,
+                           struct dsw_queue_flow *target_qf)
 {
        uint16_t i;
 
        for (i = 0; i < port->paused_flows_len; i++) {
                struct dsw_queue_flow *qf = &port->paused_flows[i];
 
-               if (qf->queue_id == queue_id &&
-                   qf->flow_hash == paused_flow_hash) {
+               if (qf->queue_id == target_qf->queue_id &&
+                   qf->flow_hash == target_qf->flow_hash) {
                        uint16_t last_idx = port->paused_flows_len-1;
                        if (i != last_idx)
                                port->paused_flows[i] =
@@ -251,30 +273,37 @@ dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
        }
 }
 
+static void
+dsw_port_remove_paused_flows(struct dsw_port *port,
+                            struct dsw_queue_flow *qfs, uint8_t qfs_len)
+{
+       uint8_t i;
+
+       for (i = 0; i < qfs_len; i++)
+               dsw_port_remove_paused_flow(port, &qfs[i]);
+
+}
+
 static void
 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
 
 static void
-dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
-                          uint8_t originating_port_id, uint8_t queue_id,
-                          uint16_t paused_flow_hash)
+dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+                           uint8_t originating_port_id,
+                           struct dsw_queue_flow *paused_qfs,
+                           uint8_t qfs_len)
 {
        struct dsw_ctl_msg cfm = {
                .type = DSW_CTL_CFM,
-               .originating_port_id = port->id,
-               .queue_id = queue_id,
-               .flow_hash = paused_flow_hash
+               .originating_port_id = port->id
        };
 
-       DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
-                       queue_id, paused_flow_hash);
-
        /* There might be already-scheduled events belonging to the
         * paused flow in the output buffers.
         */
        dsw_port_flush_out_buffers(dsw, port);
 
-       dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+       dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
 
        /* Make sure any stores to the original port's in_ring is seen
         * before the ctl message.
@@ -284,47 +313,11 @@ dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
        dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
 }
 
-static void
-dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
-                         uint8_t exclude_port_id, int16_t *port_loads,
-                         uint8_t *target_port_id, int16_t *target_load)
-{
-       int16_t candidate_port_id = -1;
-       int16_t candidate_load = DSW_MAX_LOAD;
-       uint16_t i;
-
-       for (i = 0; i < num_port_ids; i++) {
-               uint8_t port_id = port_ids[i];
-               if (port_id != exclude_port_id) {
-                       int16_t load = port_loads[port_id];
-                       if (candidate_port_id == -1 ||
-                           load < candidate_load) {
-                               candidate_port_id = port_id;
-                               candidate_load = load;
-                       }
-               }
-       }
-       *target_port_id = candidate_port_id;
-       *target_load = candidate_load;
-}
-
 struct dsw_queue_flow_burst {
        struct dsw_queue_flow queue_flow;
        uint16_t count;
 };
 
-static inline int
-dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
-{
-       const struct dsw_queue_flow_burst *burst_a = v_burst_a;
-       const struct dsw_queue_flow_burst *burst_b = v_burst_b;
-
-       int a_count = burst_a->count;
-       int b_count = burst_b->count;
-
-       return a_count - b_count;
-}
-
 #define DSW_QF_TO_INT(_qf)                                     \
        ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
 
@@ -363,8 +356,6 @@ dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
                current_burst->count++;
        }
 
-       qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
-
        return num_bursts;
 }
 
@@ -376,7 +367,13 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
        uint16_t i;
 
        for (i = 0; i < dsw->num_ports; i++) {
-               int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+               int16_t measured_load = rte_atomic16_read(&dsw->ports[i].load);
+               int32_t immigration_load =
+                       rte_atomic32_read(&dsw->ports[i].immigration_load);
+               int32_t load = measured_load + immigration_load;
+
+               load = RTE_MIN(load, DSW_MAX_LOAD);
+
                if (load < load_limit)
                        below_limit = true;
                port_loads[i] = load;
@@ -384,44 +381,161 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
        return below_limit;
 }
 
+static int16_t
+dsw_flow_load(uint16_t num_events, int16_t port_load)
+{
+       return ((int32_t)port_load * (int32_t)num_events) /
+               DSW_MAX_EVENTS_RECORDED;
+}
+
+static int16_t
+dsw_evaluate_migration(int16_t source_load, int16_t target_load,
+                      int16_t flow_load)
+{
+       int32_t res_target_load;
+       int32_t imbalance;
+
+       if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
+               return -1;
+
+       imbalance = source_load - target_load;
+
+       if (imbalance < DSW_REBALANCE_THRESHOLD)
+               return -1;
+
+       res_target_load = target_load + flow_load;
+
+       /* If the estimated load of the target port will be higher
+        * than the source port's load, it doesn't make sense to move
+        * the flow.
+        */
+       if (res_target_load > source_load)
+               return -1;
+
+       /* The more idle the target will be, the better. This will
+        * make migration prefer moving smaller flows, and flows to
+        * lightly loaded ports.
+        */
+       return DSW_MAX_LOAD - res_target_load;
+}
+
+static bool
+dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
+{
+       struct dsw_queue *queue = &dsw->queues[queue_id];
+       uint16_t i;
+
+       for (i = 0; i < queue->num_serving_ports; i++)
+               if (queue->serving_ports[i] == port_id)
+                       return true;
+
+       return false;
+}
+
 static bool
-dsw_select_migration_target(struct dsw_evdev *dsw,
-                           struct dsw_port *source_port,
+dsw_select_emigration_target(struct dsw_evdev *dsw,
                            struct dsw_queue_flow_burst *bursts,
-                           uint16_t num_bursts, int16_t *port_loads,
-                           int16_t max_load, struct dsw_queue_flow *target_qf,
-                           uint8_t *target_port_id)
+                           uint16_t num_bursts, uint8_t source_port_id,
+                           int16_t *port_loads, uint16_t num_ports,
+                           uint8_t *target_port_ids,
+                           struct dsw_queue_flow *target_qfs,
+                           uint8_t *targets_len)
 {
-       uint16_t source_load = port_loads[source_port->id];
+       int16_t source_port_load = port_loads[source_port_id];
+       struct dsw_queue_flow *candidate_qf = NULL;
+       uint8_t candidate_port_id = 0;
+       int16_t candidate_weight = -1;
+       int16_t candidate_flow_load = -1;
        uint16_t i;
 
+       if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
+               return false;
+
        for (i = 0; i < num_bursts; i++) {
-               struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+               struct dsw_queue_flow_burst *burst = &bursts[i];
+               struct dsw_queue_flow *qf = &burst->queue_flow;
+               int16_t flow_load;
+               uint16_t port_id;
 
-               if (dsw_port_is_flow_paused(source_port, qf->queue_id,
-                                           qf->flow_hash))
+               if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
+                                            qf->queue_id, qf->flow_hash))
                        continue;
 
-               struct dsw_queue *queue = &dsw->queues[qf->queue_id];
-               int16_t target_load;
+               flow_load = dsw_flow_load(burst->count, source_port_load);
 
-               dsw_find_lowest_load_port(queue->serving_ports,
-                                         queue->num_serving_ports,
-                                         source_port->id, port_loads,
-                                         target_port_id, &target_load);
+               for (port_id = 0; port_id < num_ports; port_id++) {
+                       int16_t weight;
 
-               if (target_load < source_load &&
-                   target_load < max_load) {
-                       *target_qf = *qf;
-                       return true;
+                       if (port_id == source_port_id)
+                               continue;
+
+                       if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
+                               continue;
+
+                       weight = dsw_evaluate_migration(source_port_load,
+                                                       port_loads[port_id],
+                                                       flow_load);
+
+                       if (weight > candidate_weight) {
+                               candidate_qf = qf;
+                               candidate_port_id = port_id;
+                               candidate_weight = weight;
+                               candidate_flow_load = flow_load;
+                       }
                }
        }
 
-       DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
-                       "no target port found with load less than %d.\n",
-                       num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+       if (candidate_weight < 0)
+               return false;
+
+       DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
+                       "flow_hash %d (with flow load %d) for migration "
+                       "to port %d.\n", candidate_qf->queue_id,
+                       candidate_qf->flow_hash,
+                       DSW_LOAD_TO_PERCENT(candidate_flow_load),
+                       candidate_port_id);
 
-       return false;
+       port_loads[candidate_port_id] += candidate_flow_load;
+       port_loads[source_port_id] -= candidate_flow_load;
+
+       target_port_ids[*targets_len] = candidate_port_id;
+       target_qfs[*targets_len] = *candidate_qf;
+       (*targets_len)++;
+
+       rte_atomic32_add(&dsw->ports[candidate_port_id].immigration_load,
+                        candidate_flow_load);
+
+       return true;
+}
+
+static void
+dsw_select_emigration_targets(struct dsw_evdev *dsw,
+                             struct dsw_port *source_port,
+                             struct dsw_queue_flow_burst *bursts,
+                             uint16_t num_bursts, int16_t *port_loads)
+{
+       struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
+       uint8_t *target_port_ids = source_port->emigration_target_port_ids;
+       uint8_t *targets_len = &source_port->emigration_targets_len;
+       uint16_t i;
+
+       for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
+               bool found;
+
+               found = dsw_select_emigration_target(dsw, bursts, num_bursts,
+                                                    source_port->id,
+                                                    port_loads, dsw->num_ports,
+                                                    target_port_ids,
+                                                    target_qfs,
+                                                    targets_len);
+               if (!found)
+                       break;
+       }
+
+       if (*targets_len == 0)
+               DSW_LOG_DP_PORT(DEBUG, source_port->id,
+                               "For the %d flows considered, no target port "
+                               "was found.\n", num_bursts);
 }
 
 static uint8_t
@@ -562,7 +676,7 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
 static void
 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
                             struct dsw_port *source_port,
-                            uint8_t queue_id, uint16_t paused_flow_hash)
+                            const struct dsw_queue_flow *qf)
 {
        uint16_t paused_events_len = source_port->paused_events_len;
        struct rte_event paused_events[paused_events_len];
@@ -572,7 +686,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
        if (paused_events_len == 0)
                return;
 
-       if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+       if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
                return;
 
        rte_memcpy(paused_events, source_port->paused_events,
@@ -580,7 +694,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
 
        source_port->paused_events_len = 0;
 
-       dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+       dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
 
        for (i = 0; i < paused_events_len; i++) {
                struct rte_event *event = &paused_events[i];
@@ -588,8 +702,8 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
 
                flow_hash = dsw_flow_id_hash(event->flow_id);
 
-               if (event->queue_id == queue_id &&
-                   flow_hash == paused_flow_hash)
+               if (event->queue_id == qf->queue_id &&
+                   flow_hash == qf->flow_hash)
                        dsw_port_buffer_non_paused(dsw, source_port,
                                                   dest_port_id, event);
                else
@@ -598,39 +712,100 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw,
 }
 
 static void
-dsw_port_migration_stats(struct dsw_port *port)
+dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
 {
-       uint64_t migration_latency;
+       uint64_t flow_migration_latency;
 
-       migration_latency = (rte_get_timer_cycles() - port->migration_start);
-       port->migration_latency += migration_latency;
-       port->migrations++;
+       flow_migration_latency =
+               (rte_get_timer_cycles() - port->emigration_start);
+       port->emigration_latency += (flow_migration_latency * finished);
+       port->emigrations += finished;
 }
 
 static void
-dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
+                       uint8_t schedule_type)
 {
-       uint8_t queue_id = port->migration_target_qf.queue_id;
-       uint16_t flow_hash = port->migration_target_qf.flow_hash;
+       uint8_t i;
+       struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
+       uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
+       uint8_t left_qfs_len = 0;
+       uint8_t finished;
+
+       for (i = 0; i < port->emigration_targets_len; i++) {
+               struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
+               uint8_t queue_id = qf->queue_id;
+               uint8_t queue_schedule_type =
+                       dsw->queues[queue_id].schedule_type;
+               uint16_t flow_hash = qf->flow_hash;
+
+               if (queue_schedule_type != schedule_type) {
+                       left_port_ids[left_qfs_len] =
+                               port->emigration_target_port_ids[i];
+                       left_qfs[left_qfs_len] = *qf;
+                       left_qfs_len++;
+                       continue;
+               }
+
+               DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
+                               "queue_id %d flow_hash %d.\n", queue_id,
+                               flow_hash);
+
+               if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
+                       dsw_port_remove_paused_flow(port, qf);
+                       dsw_port_flush_paused_events(dsw, port, qf);
+               }
+       }
 
-       port->migration_state = DSW_MIGRATION_STATE_IDLE;
-       port->seen_events_len = 0;
+       finished = port->emigration_targets_len - left_qfs_len;
 
-       dsw_port_migration_stats(port);
+       if (finished > 0)
+               dsw_port_emigration_stats(port, finished);
+
+       for (i = 0; i < left_qfs_len; i++) {
+               port->emigration_target_port_ids[i] = left_port_ids[i];
+               port->emigration_target_qfs[i] = left_qfs[i];
+       }
+       port->emigration_targets_len = left_qfs_len;
+
+       if (port->emigration_targets_len == 0) {
+               port->migration_state = DSW_MIGRATION_STATE_IDLE;
+               port->seen_events_len = 0;
+       }
+}
 
-       if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
-               dsw_port_remove_paused_flow(port, queue_id, flow_hash);
-               dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+static void
+dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
+                            struct dsw_port *source_port)
+{
+       uint8_t i;
+
+       for (i = 0; i < source_port->emigration_targets_len; i++) {
+               struct dsw_queue_flow *qf =
+                       &source_port->emigration_target_qfs[i];
+               uint8_t queue_id = qf->queue_id;
+
+               if (dsw->queues[queue_id].schedule_type ==
+                   RTE_SCHED_TYPE_PARALLEL) {
+                       uint8_t dest_port_id =
+                               source_port->emigration_target_port_ids[i];
+                       uint16_t flow_hash = qf->flow_hash;
+
+                       /* Single byte-sized stores are always atomic. */
+                       dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+                               dest_port_id;
+               }
        }
 
-       DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
-                       "%d flow_hash %d.\n", queue_id, flow_hash);
+       rte_smp_wmb();
+
+       dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
 }
 
 static void
-dsw_port_consider_migration(struct dsw_evdev *dsw,
-                           struct dsw_port *source_port,
-                           uint64_t now)
+dsw_port_consider_emigration(struct dsw_evdev *dsw,
+                            struct dsw_port *source_port,
+                            uint64_t now)
 {
        bool any_port_below_limit;
        struct dsw_queue_flow *seen_events = source_port->seen_events;
@@ -640,31 +815,31 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
        int16_t source_port_load;
        int16_t port_loads[dsw->num_ports];
 
-       if (now < source_port->next_migration)
+       if (now < source_port->next_emigration)
                return;
 
        if (dsw->num_ports == 1)
                return;
 
-       DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+       DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
 
        /* Randomize interval to avoid having all threads considering
-        * migration at the same in point in time, which might lead to
-        * all choosing the same target port.
+        * emigration at the same in point in time, which might lead
+        * to all choosing the same target port.
         */
-       source_port->next_migration = now +
+       source_port->next_emigration = now +
                source_port->migration_interval / 2 +
                rte_rand() % source_port->migration_interval;
 
        if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
                DSW_LOG_DP_PORT(DEBUG, source_port->id,
-                               "Migration already in progress.\n");
+                               "Emigration already in progress.\n");
                return;
        }
 
        /* For simplicity, avoid migration in the unlikely case there
         * is still events to consume in the in_buffer (from the last
-        * migration).
+        * emigration).
         */
        if (source_port->in_buffer_len > 0) {
                DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
@@ -675,9 +850,9 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
        source_port_load = rte_atomic16_read(&source_port->load);
        if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
                DSW_LOG_DP_PORT(DEBUG, source_port->id,
-                               "Load %d is below threshold level %d.\n",
-                               DSW_LOAD_TO_PERCENT(source_port_load),
-                      DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+                     "Load %d is below threshold level %d.\n",
+                     DSW_LOAD_TO_PERCENT(source_port_load),
+                     DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
                return;
        }
 
@@ -694,16 +869,9 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
                return;
        }
 
-       /* Sort flows into 'bursts' to allow attempting to migrating
-        * small (but still active) flows first - this it to avoid
-        * having large flows moving around the worker cores too much
-        * (to avoid cache misses, among other things). Of course, the
-        * number of recorded events (queue+flow ids) are limited, and
-        * provides only a snapshot, so only so many conclusions can
-        * be drawn from this data.
-        */
        num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
                                            bursts);
+
        /* For non-big-little systems, there's no point in moving the
         * only (known) flow.
         */
@@ -715,108 +883,80 @@ dsw_port_consider_migration(struct dsw_evdev *dsw,
                return;
        }
 
-       /* The strategy is to first try to find a flow to move to a
-        * port with low load (below the migration-attempt
-        * threshold). If that fails, we try to find a port which is
-        * below the max threshold, and also less loaded than this
-        * port is.
-        */
-       if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
-                                        port_loads,
-                                        DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
-                                        &source_port->migration_target_qf,
-                                        &source_port->migration_target_port_id)
-           &&
-           !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
-                                        port_loads,
-                                        DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
-                                        &source_port->migration_target_qf,
-                                      &source_port->migration_target_port_id))
-               return;
+       dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
+                                     port_loads);
 
-       DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
-                       "flow_hash %d from port %d to port %d.\n",
-                       source_port->migration_target_qf.queue_id,
-                       source_port->migration_target_qf.flow_hash,
-                       source_port->id, source_port->migration_target_port_id);
-
-       /* We have a winner. */
+       if (source_port->emigration_targets_len == 0)
+               return;
 
        source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
-       source_port->migration_start = rte_get_timer_cycles();
+       source_port->emigration_start = rte_get_timer_cycles();
 
        /* No need to go through the whole pause procedure for
         * parallel queues, since atomic/ordered semantics need not to
         * be maintained.
         */
+       dsw_port_move_parallel_flows(dsw, source_port);
 
-       if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
-           == RTE_SCHED_TYPE_PARALLEL) {
-               uint8_t queue_id = source_port->migration_target_qf.queue_id;
-               uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
-               uint8_t dest_port_id = source_port->migration_target_port_id;
-
-               /* Single byte-sized stores are always atomic. */
-               dsw->queues[queue_id].flow_to_port_map[flow_hash] =
-                       dest_port_id;
-               rte_smp_wmb();
-
-               dsw_port_end_migration(dsw, source_port);
-
+       /* All flows were on PARALLEL queues. */
+       if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
                return;
-       }
 
        /* There might be 'loopback' events already scheduled in the
         * output buffers.
         */
        dsw_port_flush_out_buffers(dsw, source_port);
 
-       dsw_port_add_paused_flow(source_port,
-                                source_port->migration_target_qf.queue_id,
-                                source_port->migration_target_qf.flow_hash);
+       dsw_port_add_paused_flows(source_port,
+                                 source_port->emigration_target_qfs,
+                                 source_port->emigration_targets_len);
 
        dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
-                              source_port->migration_target_qf.queue_id,
-                              source_port->migration_target_qf.flow_hash);
+                              source_port->emigration_target_qfs,
+                              source_port->emigration_targets_len);
        source_port->cfm_cnt = 0;
 }
 
 static void
 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
                             struct dsw_port *source_port,
-                            uint8_t queue_id, uint16_t paused_flow_hash);
+                            const struct dsw_queue_flow *qf);
 
 static void
-dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
-                            uint8_t originating_port_id, uint8_t queue_id,
-                            uint16_t paused_flow_hash)
+dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+                             uint8_t originating_port_id,
+                             struct dsw_queue_flow *paused_qfs,
+                             uint8_t qfs_len)
 {
+       uint16_t i;
        struct dsw_ctl_msg cfm = {
                .type = DSW_CTL_CFM,
-               .originating_port_id = port->id,
-               .queue_id = queue_id,
-               .flow_hash = paused_flow_hash
+               .originating_port_id = port->id
        };
 
-       DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
-                       queue_id, paused_flow_hash);
-
-       dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+       dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
 
        rte_smp_rmb();
 
        dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
 
-       dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+       for (i = 0; i < qfs_len; i++) {
+               struct dsw_queue_flow *qf = &paused_qfs[i];
+
+               if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
+                       port->immigrations++;
+
+               dsw_port_flush_paused_events(dsw, port, qf);
+       }
 }
 
 #define FORWARD_BURST_SIZE (32)
 
 static void
-dsw_port_forward_migrated_flow(struct dsw_port *source_port,
-                              struct rte_event_ring *dest_ring,
-                              uint8_t queue_id,
-                              uint16_t flow_hash)
+dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
+                               struct rte_event_ring *dest_ring,
+                               uint8_t queue_id,
+                               uint16_t flow_hash)
 {
        uint16_t events_left;
 
@@ -862,31 +1002,37 @@ dsw_port_forward_migrated_flow(struct dsw_port *source_port,
 }
 
 static void
-dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
-                            struct dsw_port *source_port)
+dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
+                              struct dsw_port *source_port)
 {
-       uint8_t queue_id = source_port->migration_target_qf.queue_id;
-       uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
-       uint8_t dest_port_id = source_port->migration_target_port_id;
-       struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+       uint8_t i;
 
        dsw_port_flush_out_buffers(dsw, source_port);
 
        rte_smp_wmb();
 
-       dsw->queues[queue_id].flow_to_port_map[flow_hash] =
-               dest_port_id;
+       for (i = 0; i < source_port->emigration_targets_len; i++) {
+               struct dsw_queue_flow *qf =
+                       &source_port->emigration_target_qfs[i];
+               uint8_t dest_port_id =
+                       source_port->emigration_target_port_ids[i];
+               struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+               dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
+                       dest_port_id;
 
-       dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
-                                      queue_id, flow_hash);
+               dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
+                                               qf->queue_id, qf->flow_hash);
+       }
 
        /* Flow table update and migration destination port's enqueues
         * must be seen before the control message.
         */
        rte_smp_wmb();
 
-       dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
-                              flow_hash);
+       dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
+                              source_port->emigration_target_qfs,
+                              source_port->emigration_targets_len);
        source_port->cfm_cnt = 0;
        source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
 }
@@ -904,7 +1050,8 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
                        port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
                        break;
                case DSW_MIGRATION_STATE_UNPAUSING:
-                       dsw_port_end_migration(dsw, port);
+                       dsw_port_end_emigration(dsw, port,
+                                               RTE_SCHED_TYPE_ATOMIC);
                        break;
                default:
                        RTE_ASSERT(0);
@@ -918,23 +1065,17 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
 {
        struct dsw_ctl_msg msg;
 
-       /* So any table loads happens before the ring dequeue, in the
-        * case of a 'paus' message.
-        */
-       rte_smp_rmb();
-
        if (dsw_port_ctl_dequeue(port, &msg) == 0) {
                switch (msg.type) {
                case DSW_CTL_PAUS_REQ:
-                       dsw_port_handle_pause_flow(dsw, port,
-                                                  msg.originating_port_id,
-                                                  msg.queue_id, msg.flow_hash);
+                       dsw_port_handle_pause_flows(dsw, port,
+                                                   msg.originating_port_id,
+                                                   msg.qfs, msg.qfs_len);
                        break;
                case DSW_CTL_UNPAUS_REQ:
-                       dsw_port_handle_unpause_flow(dsw, port,
-                                                    msg.originating_port_id,
-                                                    msg.queue_id,
-                                                    msg.flow_hash);
+                       dsw_port_handle_unpause_flows(dsw, port,
+                                                     msg.originating_port_id,
+                                                     msg.qfs, msg.qfs_len);
                        break;
                case DSW_CTL_CFM:
                        dsw_port_handle_confirm(dsw, port);
@@ -957,7 +1098,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
 {
        if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
                     port->pending_releases == 0))
-               dsw_port_move_migrating_flow(dsw, port);
+               dsw_port_move_emigrating_flows(dsw, port);
 
        /* Polling the control ring is relatively inexpensive, and
         * polling it often helps bringing down migration latency, so
@@ -984,7 +1125,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
 
                dsw_port_consider_load_update(port, now);
 
-               dsw_port_consider_migration(dsw, port, now);
+               dsw_port_consider_emigration(dsw, port, now);
 
                port->ops_since_bg_task = 0;
        }
@@ -1182,11 +1323,6 @@ static uint16_t
 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
                       uint16_t num)
 {
-       struct dsw_port *source_port = port;
-       struct dsw_evdev *dsw = source_port->dsw;
-
-       dsw_port_ctl_process(dsw, source_port);
-
        if (unlikely(port->in_buffer_len > 0)) {
                uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);