static void
dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
- uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+ uint8_t type, struct dsw_queue_flow *qfs,
+ uint8_t qfs_len)
{
uint16_t port_id;
struct dsw_ctl_msg msg = {
.type = type,
.originating_port_id = source_port->id,
- .queue_id = queue_id,
- .flow_hash = flow_hash
+ .qfs_len = qfs_len
};
+ memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
+
for (port_id = 0; port_id < dsw->num_ports; port_id++)
if (port_id != source_port->id)
dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
}
-static bool
-dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
- uint16_t flow_hash)
+static __rte_always_inline bool
+dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
+ uint8_t queue_id, uint16_t flow_hash)
{
uint16_t i;
- for (i = 0; i < port->paused_flows_len; i++) {
- struct dsw_queue_flow *qf = &port->paused_flows[i];
- if (qf->queue_id == queue_id &&
- qf->flow_hash == flow_hash)
+ for (i = 0; i < qfs_len; i++)
+ if (qfs[i].queue_id == queue_id &&
+ qfs[i].flow_hash == flow_hash)
return true;
- }
+
return false;
}
+static __rte_always_inline bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ return dsw_is_queue_flow_in_ary(port->paused_flows,
+ port->paused_flows_len,
+ queue_id, flow_hash);
+}
+
static void
-dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
+ uint8_t qfs_len)
{
- port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
- .queue_id = queue_id,
- .flow_hash = paused_flow_hash
+ uint8_t i;
+
+ for (i = 0; i < qfs_len; i++) {
+ struct dsw_queue_flow *qf = &qfs[i];
+
+ DSW_LOG_DP_PORT(DEBUG, port->id,
+ "Pausing queue_id %d flow_hash %d.\n",
+ qf->queue_id, qf->flow_hash);
+
+ port->paused_flows[port->paused_flows_len] = *qf;
+ port->paused_flows_len++;
};
- port->paused_flows_len++;
}
static void
-dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_remove_paused_flow(struct dsw_port *port,
+ struct dsw_queue_flow *target_qf)
{
uint16_t i;
for (i = 0; i < port->paused_flows_len; i++) {
struct dsw_queue_flow *qf = &port->paused_flows[i];
- if (qf->queue_id == queue_id &&
- qf->flow_hash == paused_flow_hash) {
+ if (qf->queue_id == target_qf->queue_id &&
+ qf->flow_hash == target_qf->flow_hash) {
uint16_t last_idx = port->paused_flows_len-1;
if (i != last_idx)
port->paused_flows[i] =
}
}
+static void
+dsw_port_remove_paused_flows(struct dsw_port *port,
+ struct dsw_queue_flow *qfs, uint8_t qfs_len)
+{
+ uint8_t i;
+
+ for (i = 0; i < qfs_len; i++)
+ dsw_port_remove_paused_flow(port, &qfs[i]);
+
+}
+
static void
dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
static void
-dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
- uint8_t originating_port_id, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id,
+ struct dsw_queue_flow *paused_qfs,
+ uint8_t qfs_len)
{
struct dsw_ctl_msg cfm = {
.type = DSW_CTL_CFM,
- .originating_port_id = port->id,
- .queue_id = queue_id,
- .flow_hash = paused_flow_hash
+ .originating_port_id = port->id
};
- DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
- queue_id, paused_flow_hash);
-
/* There might be already-scheduled events belonging to the
* paused flow in the output buffers.
*/
dsw_port_flush_out_buffers(dsw, port);
- dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+ dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
/* Make sure any stores to the original port's in_ring is seen
* before the ctl message.
dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
}
-static void
-dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
- uint8_t exclude_port_id, int16_t *port_loads,
- uint8_t *target_port_id, int16_t *target_load)
-{
- int16_t candidate_port_id = -1;
- int16_t candidate_load = DSW_MAX_LOAD;
- uint16_t i;
-
- for (i = 0; i < num_port_ids; i++) {
- uint8_t port_id = port_ids[i];
- if (port_id != exclude_port_id) {
- int16_t load = port_loads[port_id];
- if (candidate_port_id == -1 ||
- load < candidate_load) {
- candidate_port_id = port_id;
- candidate_load = load;
- }
- }
- }
- *target_port_id = candidate_port_id;
- *target_load = candidate_load;
-}
-
struct dsw_queue_flow_burst {
struct dsw_queue_flow queue_flow;
uint16_t count;
};
-static inline int
-dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
-{
- const struct dsw_queue_flow_burst *burst_a = v_burst_a;
- const struct dsw_queue_flow_burst *burst_b = v_burst_b;
-
- int a_count = burst_a->count;
- int b_count = burst_b->count;
-
- return a_count - b_count;
-}
-
#define DSW_QF_TO_INT(_qf) \
((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
current_burst->count++;
}
- qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
-
return num_bursts;
}
return below_limit;
}
+static int16_t
+dsw_flow_load(uint16_t num_events, int16_t port_load)
+{
+ return ((int32_t)port_load * (int32_t)num_events) /
+ DSW_MAX_EVENTS_RECORDED;
+}
+
+static int16_t
+dsw_evaluate_migration(int16_t source_load, int16_t target_load,
+ int16_t flow_load)
+{
+ int32_t res_target_load;
+ int32_t imbalance;
+
+ if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
+ return -1;
+
+ imbalance = source_load - target_load;
+
+ if (imbalance < DSW_REBALANCE_THRESHOLD)
+ return -1;
+
+ res_target_load = target_load + flow_load;
+
+ /* If the estimated load of the target port will be higher
+ * than the source port's load, it doesn't make sense to move
+ * the flow.
+ */
+ if (res_target_load > source_load)
+ return -1;
+
+ /* The more idle the target will be, the better. This will
+ * make migration prefer moving smaller flows, and flows to
+ * lightly loaded ports.
+ */
+ return DSW_MAX_LOAD - res_target_load;
+}
+
+static bool
+dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
+{
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+ uint16_t i;
+
+ for (i = 0; i < queue->num_serving_ports; i++)
+ if (queue->serving_ports[i] == port_id)
+ return true;
+
+ return false;
+}
+
static bool
dsw_select_emigration_target(struct dsw_evdev *dsw,
- struct dsw_port *source_port,
- struct dsw_queue_flow_burst *bursts,
- uint16_t num_bursts, int16_t *port_loads,
- int16_t max_load, struct dsw_queue_flow *target_qf,
- uint8_t *target_port_id)
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, uint8_t source_port_id,
+ int16_t *port_loads, uint16_t num_ports,
+ uint8_t *target_port_ids,
+ struct dsw_queue_flow *target_qfs,
+ uint8_t *targets_len)
{
- uint16_t source_load = port_loads[source_port->id];
+ int16_t source_port_load = port_loads[source_port_id];
+ struct dsw_queue_flow *candidate_qf;
+ uint8_t candidate_port_id;
+ int16_t candidate_weight = -1;
+ int16_t candidate_flow_load;
uint16_t i;
+ if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
+ return false;
+
for (i = 0; i < num_bursts; i++) {
- struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+ struct dsw_queue_flow_burst *burst = &bursts[i];
+ struct dsw_queue_flow *qf = &burst->queue_flow;
+ int16_t flow_load;
+ uint16_t port_id;
- if (dsw_port_is_flow_paused(source_port, qf->queue_id,
- qf->flow_hash))
+ if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
+ qf->queue_id, qf->flow_hash))
continue;
- struct dsw_queue *queue = &dsw->queues[qf->queue_id];
- int16_t target_load;
+ flow_load = dsw_flow_load(burst->count, source_port_load);
- dsw_find_lowest_load_port(queue->serving_ports,
- queue->num_serving_ports,
- source_port->id, port_loads,
- target_port_id, &target_load);
+ for (port_id = 0; port_id < num_ports; port_id++) {
+ int16_t weight;
- if (target_load < source_load &&
- target_load < max_load) {
- *target_qf = *qf;
- return true;
+ if (port_id == source_port_id)
+ continue;
+
+ if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
+ continue;
+
+ weight = dsw_evaluate_migration(source_port_load,
+ port_loads[port_id],
+ flow_load);
+
+ if (weight > candidate_weight) {
+ candidate_qf = qf;
+ candidate_port_id = port_id;
+ candidate_weight = weight;
+ candidate_flow_load = flow_load;
+ }
}
}
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
- "no target port found with load less than %d.\n",
- num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+ if (candidate_weight < 0)
+ return false;
- return false;
+ DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
+ "flow_hash %d (with flow load %d) for migration "
+ "to port %d.\n", candidate_qf->queue_id,
+ candidate_qf->flow_hash,
+ DSW_LOAD_TO_PERCENT(candidate_flow_load),
+ candidate_port_id);
+
+ port_loads[candidate_port_id] += candidate_flow_load;
+ port_loads[source_port_id] -= candidate_flow_load;
+
+ target_port_ids[*targets_len] = candidate_port_id;
+ target_qfs[*targets_len] = *candidate_qf;
+ (*targets_len)++;
+
+ return true;
+}
+
+static void
+dsw_select_emigration_targets(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, int16_t *port_loads)
+{
+ struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
+ uint8_t *target_port_ids = source_port->emigration_target_port_ids;
+ uint8_t *targets_len = &source_port->emigration_targets_len;
+ uint8_t i;
+
+ for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
+ bool found;
+
+ found = dsw_select_emigration_target(dsw, bursts, num_bursts,
+ source_port->id,
+ port_loads, dsw->num_ports,
+ target_port_ids,
+ target_qfs,
+ targets_len);
+ if (!found)
+ break;
+ }
+
+ if (*targets_len == 0)
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "For the %d flows considered, no target port "
+ "was found.\n", num_bursts);
}
static uint8_t
static void
dsw_port_flush_paused_events(struct dsw_evdev *dsw,
struct dsw_port *source_port,
- uint8_t queue_id, uint16_t paused_flow_hash)
+ const struct dsw_queue_flow *qf)
{
uint16_t paused_events_len = source_port->paused_events_len;
struct rte_event paused_events[paused_events_len];
if (paused_events_len == 0)
return;
- if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+ if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
return;
rte_memcpy(paused_events, source_port->paused_events,
source_port->paused_events_len = 0;
- dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+ dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
for (i = 0; i < paused_events_len; i++) {
struct rte_event *event = &paused_events[i];
flow_hash = dsw_flow_id_hash(event->flow_id);
- if (event->queue_id == queue_id &&
- flow_hash == paused_flow_hash)
+ if (event->queue_id == qf->queue_id &&
+ flow_hash == qf->flow_hash)
dsw_port_buffer_non_paused(dsw, source_port,
dest_port_id, event);
else
}
static void
-dsw_port_emigration_stats(struct dsw_port *port)
+dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
{
- uint64_t emigration_latency;
+ uint64_t flow_migration_latency;
- emigration_latency = (rte_get_timer_cycles() - port->emigration_start);
- port->emigration_latency += emigration_latency;
- port->emigrations++;
+ flow_migration_latency =
+ (rte_get_timer_cycles() - port->emigration_start);
+ port->emigration_latency += (flow_migration_latency * finished);
+ port->emigrations += finished;
}
static void
-dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port)
+dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t schedule_type)
{
- uint8_t queue_id = port->emigration_target_qf.queue_id;
- uint16_t flow_hash = port->emigration_target_qf.flow_hash;
+ uint8_t i;
+ struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
+ uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
+ uint8_t left_qfs_len = 0;
+ uint8_t finished;
+
+ for (i = 0; i < port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
+ uint8_t queue_id = qf->queue_id;
+ uint8_t queue_schedule_type =
+ dsw->queues[queue_id].schedule_type;
+ uint16_t flow_hash = qf->flow_hash;
+
+ if (queue_schedule_type != schedule_type) {
+ left_port_ids[left_qfs_len] =
+ port->emigration_target_port_ids[i];
+ left_qfs[left_qfs_len] = *qf;
+ left_qfs_len++;
+ continue;
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
+ "queue_id %d flow_hash %d.\n", queue_id,
+ flow_hash);
+
+ if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
+ dsw_port_remove_paused_flow(port, qf);
+ dsw_port_flush_paused_events(dsw, port, qf);
+ }
+ }
- port->migration_state = DSW_MIGRATION_STATE_IDLE;
- port->seen_events_len = 0;
+ finished = port->emigration_targets_len - left_qfs_len;
- dsw_port_emigration_stats(port);
+ if (finished > 0)
+ dsw_port_emigration_stats(port, finished);
- if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
- dsw_port_remove_paused_flow(port, queue_id, flow_hash);
- dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+ for (i = 0; i < left_qfs_len; i++) {
+ port->emigration_target_port_ids[i] = left_port_ids[i];
+ port->emigration_target_qfs[i] = left_qfs[i];
}
+ port->emigration_targets_len = left_qfs_len;
- DSW_LOG_DP_PORT(DEBUG, port->id, "Emigration completed for queue_id "
- "%d flow_hash %d.\n", queue_id, flow_hash);
+ if (port->emigration_targets_len == 0) {
+ port->migration_state = DSW_MIGRATION_STATE_IDLE;
+ port->seen_events_len = 0;
+ }
+}
+
+static void
+dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
+{
+ uint8_t i;
+
+ for (i = 0; i < source_port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf =
+ &source_port->emigration_target_qfs[i];
+ uint8_t queue_id = qf->queue_id;
+
+ if (dsw->queues[queue_id].schedule_type ==
+ RTE_SCHED_TYPE_PARALLEL) {
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_ids[i];
+ uint16_t flow_hash = qf->flow_hash;
+
+ /* Single byte-sized stores are always atomic. */
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ dest_port_id;
+ }
+ }
+
+ rte_smp_wmb();
+
+ dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
}
static void
source_port_load = rte_atomic16_read(&source_port->load);
if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
DSW_LOG_DP_PORT(DEBUG, source_port->id,
- "Load %d is below threshold level %d.\n",
- DSW_LOAD_TO_PERCENT(source_port_load),
- DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+ "Load %d is below threshold level %d.\n",
+ DSW_LOAD_TO_PERCENT(source_port_load),
+ DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
return;
}
return;
}
- /* Sort flows into 'bursts' to allow attempting to migrating
- * small (but still active) flows first - this it to avoid
- * having large flows moving around the worker cores too much
- * (to avoid cache misses, among other things). Of course, the
- * number of recorded events (queue+flow ids) are limited, and
- * provides only a snapshot, so only so many conclusions can
- * be drawn from this data.
- */
num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
bursts);
+
/* For non-big-little systems, there's no point in moving the
* only (known) flow.
*/
return;
}
- /* The strategy is to first try to find a flow to move to a
- * port with low load (below the emigration-attempt
- * threshold). If that fails, we try to find a port which is
- * below the max threshold, and also less loaded than this
- * port is.
- */
- if (!dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
- port_loads,
- DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
- &source_port->emigration_target_qf,
- &source_port->emigration_target_port_id)
- &&
- !dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
- port_loads,
- DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
- &source_port->emigration_target_qf,
- &source_port->emigration_target_port_id))
- return;
-
- DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
- "flow_hash %d from port %d to port %d.\n",
- source_port->emigration_target_qf.queue_id,
- source_port->emigration_target_qf.flow_hash,
- source_port->id,
- source_port->emigration_target_port_id);
+ dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
+ port_loads);
- /* We have a winner. */
+ if (source_port->emigration_targets_len == 0)
+ return;
source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
source_port->emigration_start = rte_get_timer_cycles();
* parallel queues, since atomic/ordered semantics need not to
* be maintained.
*/
+ dsw_port_move_parallel_flows(dsw, source_port);
- if (dsw->queues[source_port->emigration_target_qf.queue_id].
- schedule_type == RTE_SCHED_TYPE_PARALLEL) {
- uint8_t queue_id =
- source_port->emigration_target_qf.queue_id;
- uint16_t flow_hash =
- source_port->emigration_target_qf.flow_hash;
- uint8_t dest_port_id =
- source_port->emigration_target_port_id;
-
- /* Single byte-sized stores are always atomic. */
- dsw->queues[queue_id].flow_to_port_map[flow_hash] =
- dest_port_id;
- rte_smp_wmb();
-
- dsw_port_end_emigration(dsw, source_port);
-
+ /* All flows were on PARALLEL queues. */
+ if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
return;
- }
/* There might be 'loopback' events already scheduled in the
* output buffers.
*/
dsw_port_flush_out_buffers(dsw, source_port);
- dsw_port_add_paused_flow(source_port,
- source_port->emigration_target_qf.queue_id,
- source_port->emigration_target_qf.flow_hash);
+ dsw_port_add_paused_flows(source_port,
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
- source_port->emigration_target_qf.queue_id,
- source_port->emigration_target_qf.flow_hash);
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
source_port->cfm_cnt = 0;
}
static void
dsw_port_flush_paused_events(struct dsw_evdev *dsw,
struct dsw_port *source_port,
- uint8_t queue_id, uint16_t paused_flow_hash);
+ const struct dsw_queue_flow *qf);
static void
-dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
- uint8_t originating_port_id, uint8_t queue_id,
- uint16_t paused_flow_hash)
+dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id,
+ struct dsw_queue_flow *paused_qfs,
+ uint8_t qfs_len)
{
+ uint16_t i;
struct dsw_ctl_msg cfm = {
.type = DSW_CTL_CFM,
- .originating_port_id = port->id,
- .queue_id = queue_id,
- .flow_hash = paused_flow_hash
+ .originating_port_id = port->id
};
- DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
- queue_id, paused_flow_hash);
-
- dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+ dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
rte_smp_rmb();
- if (dsw_schedule(dsw, queue_id, paused_flow_hash) == port->id)
- port->immigrations++;
-
dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
- dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+ for (i = 0; i < qfs_len; i++) {
+ struct dsw_queue_flow *qf = &paused_qfs[i];
+
+ if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
+ port->immigrations++;
+
+ dsw_port_flush_paused_events(dsw, port, qf);
+ }
}
#define FORWARD_BURST_SIZE (32)
}
static void
-dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
- struct dsw_port *source_port)
+dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
{
- uint8_t queue_id = source_port->emigration_target_qf.queue_id;
- uint16_t flow_hash = source_port->emigration_target_qf.flow_hash;
- uint8_t dest_port_id = source_port->emigration_target_port_id;
- struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+ uint8_t i;
dsw_port_flush_out_buffers(dsw, source_port);
rte_smp_wmb();
- dsw->queues[queue_id].flow_to_port_map[flow_hash] =
- dest_port_id;
+ for (i = 0; i < source_port->emigration_targets_len; i++) {
+ struct dsw_queue_flow *qf =
+ &source_port->emigration_target_qfs[i];
+ uint8_t dest_port_id =
+ source_port->emigration_target_port_ids[i];
+ struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+ dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
+ dest_port_id;
- dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
- queue_id, flow_hash);
+ dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
+ qf->queue_id, qf->flow_hash);
+ }
/* Flow table update and migration destination port's enqueues
* must be seen before the control message.
*/
rte_smp_wmb();
- dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
- flow_hash);
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
+ source_port->emigration_target_qfs,
+ source_port->emigration_targets_len);
source_port->cfm_cnt = 0;
source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
}
port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
break;
case DSW_MIGRATION_STATE_UNPAUSING:
- dsw_port_end_emigration(dsw, port);
+ dsw_port_end_emigration(dsw, port,
+ RTE_SCHED_TYPE_ATOMIC);
break;
default:
RTE_ASSERT(0);
if (dsw_port_ctl_dequeue(port, &msg) == 0) {
switch (msg.type) {
case DSW_CTL_PAUS_REQ:
- dsw_port_handle_pause_flow(dsw, port,
- msg.originating_port_id,
- msg.queue_id, msg.flow_hash);
+ dsw_port_handle_pause_flows(dsw, port,
+ msg.originating_port_id,
+ msg.qfs, msg.qfs_len);
break;
case DSW_CTL_UNPAUS_REQ:
- dsw_port_handle_unpause_flow(dsw, port,
- msg.originating_port_id,
- msg.queue_id,
- msg.flow_hash);
+ dsw_port_handle_unpause_flows(dsw, port,
+ msg.originating_port_id,
+ msg.qfs, msg.qfs_len);
break;
case DSW_CTL_CFM:
dsw_port_handle_confirm(dsw, port);
{
if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
port->pending_releases == 0))
- dsw_port_move_migrating_flow(dsw, port);
+ dsw_port_move_emigrating_flows(dsw, port);
/* Polling the control ring is relatively inexpensive, and
* polling it often helps bringing down migration latency, so