From 1f2b99e8d9b1bde2bba588fcbbd23e53d199ad79 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Mattias=20R=C3=B6nnblom?= Date: Mon, 9 Mar 2020 07:51:02 +0100 Subject: [PATCH] event/dsw: improve migration mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Allowing moving multiple flows in one migration transaction, to rebalance load more quickly. Introduce a threshold to avoid migrating flows between ports with very similar load. Simplify logic for selecting which flow to migrate. The aim is now to move flows in such a way that the receiving port is as lightly-loaded as possible (after receiving the flow), while still migrating enough flows from the source port to reduce its load. This is essentially how legacy strategy work as well, but the code is more readable. Signed-off-by: Mattias Rönnblom --- drivers/event/dsw/dsw_evdev.h | 15 +- drivers/event/dsw/dsw_event.c | 541 +++++++++++++++++++++------------- 2 files changed, 343 insertions(+), 213 deletions(-) diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h index 2c7f9efa30..ced40ef8db 100644 --- a/drivers/event/dsw/dsw_evdev.h +++ b/drivers/event/dsw/dsw_evdev.h @@ -93,11 +93,14 @@ #define DSW_MIGRATION_INTERVAL (1000) #define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70)) #define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95)) +#define DSW_REBALANCE_THRESHOLD (DSW_LOAD_FROM_PERCENT(3)) #define DSW_MAX_EVENTS_RECORDED (128) +#define DSW_MAX_FLOWS_PER_MIGRATION (8) + /* Only one outstanding migration per port is allowed */ -#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS) +#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS*DSW_MAX_FLOWS_PER_MIGRATION) /* Enough room for paus request/confirm and unpaus request/confirm for * all possible senders. @@ -170,8 +173,10 @@ struct dsw_port { uint64_t emigrations; uint64_t emigration_latency; - uint8_t emigration_target_port_id; - struct dsw_queue_flow emigration_target_qf; + uint8_t emigration_target_port_ids[DSW_MAX_FLOWS_PER_MIGRATION]; + struct dsw_queue_flow + emigration_target_qfs[DSW_MAX_FLOWS_PER_MIGRATION]; + uint8_t emigration_targets_len; uint8_t cfm_cnt; uint64_t immigrations; @@ -244,8 +249,8 @@ struct dsw_evdev { struct dsw_ctl_msg { uint8_t type; uint8_t originating_port_id; - uint8_t queue_id; - uint16_t flow_hash; + uint8_t qfs_len; + struct dsw_queue_flow qfs[DSW_MAX_FLOWS_PER_MIGRATION]; } __rte_aligned(4); uint16_t dsw_event_enqueue(void *port, const struct rte_event *event); diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c index 65d53b3a97..a8161fd76a 100644 --- a/drivers/event/dsw/dsw_event.c +++ b/drivers/event/dsw/dsw_event.c @@ -189,58 +189,75 @@ dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg) static void dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port, - uint8_t type, uint8_t queue_id, uint16_t flow_hash) + uint8_t type, struct dsw_queue_flow *qfs, + uint8_t qfs_len) { uint16_t port_id; struct dsw_ctl_msg msg = { .type = type, .originating_port_id = source_port->id, - .queue_id = queue_id, - .flow_hash = flow_hash + .qfs_len = qfs_len }; + memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len); + for (port_id = 0; port_id < dsw->num_ports; port_id++) if (port_id != source_port->id) dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg); } -static bool -dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, - uint16_t flow_hash) +static __rte_always_inline bool +dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len, + uint8_t queue_id, uint16_t flow_hash) { uint16_t i; - for (i = 0; i < port->paused_flows_len; i++) { - struct dsw_queue_flow *qf = &port->paused_flows[i]; - if (qf->queue_id == queue_id && - qf->flow_hash == flow_hash) + for (i = 0; i < qfs_len; i++) + if (qfs[i].queue_id == queue_id && + qfs[i].flow_hash == flow_hash) return true; - } + return false; } +static __rte_always_inline bool +dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, + uint16_t flow_hash) +{ + return dsw_is_queue_flow_in_ary(port->paused_flows, + port->paused_flows_len, + queue_id, flow_hash); +} + static void -dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs, + uint8_t qfs_len) { - port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) { - .queue_id = queue_id, - .flow_hash = paused_flow_hash + uint8_t i; + + for (i = 0; i < qfs_len; i++) { + struct dsw_queue_flow *qf = &qfs[i]; + + DSW_LOG_DP_PORT(DEBUG, port->id, + "Pausing queue_id %d flow_hash %d.\n", + qf->queue_id, qf->flow_hash); + + port->paused_flows[port->paused_flows_len] = *qf; + port->paused_flows_len++; }; - port->paused_flows_len++; } static void -dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_remove_paused_flow(struct dsw_port *port, + struct dsw_queue_flow *target_qf) { uint16_t i; for (i = 0; i < port->paused_flows_len; i++) { struct dsw_queue_flow *qf = &port->paused_flows[i]; - if (qf->queue_id == queue_id && - qf->flow_hash == paused_flow_hash) { + if (qf->queue_id == target_qf->queue_id && + qf->flow_hash == target_qf->flow_hash) { uint16_t last_idx = port->paused_flows_len-1; if (i != last_idx) port->paused_flows[i] = @@ -251,30 +268,37 @@ dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id, } } +static void +dsw_port_remove_paused_flows(struct dsw_port *port, + struct dsw_queue_flow *qfs, uint8_t qfs_len) +{ + uint8_t i; + + for (i = 0; i < qfs_len; i++) + dsw_port_remove_paused_flow(port, &qfs[i]); + +} + static void dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port); static void -dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port, - uint8_t originating_port_id, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, + struct dsw_queue_flow *paused_qfs, + uint8_t qfs_len) { struct dsw_ctl_msg cfm = { .type = DSW_CTL_CFM, - .originating_port_id = port->id, - .queue_id = queue_id, - .flow_hash = paused_flow_hash + .originating_port_id = port->id }; - DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n", - queue_id, paused_flow_hash); - /* There might be already-scheduled events belonging to the * paused flow in the output buffers. */ dsw_port_flush_out_buffers(dsw, port); - dsw_port_add_paused_flow(port, queue_id, paused_flow_hash); + dsw_port_add_paused_flows(port, paused_qfs, qfs_len); /* Make sure any stores to the original port's in_ring is seen * before the ctl message. @@ -284,47 +308,11 @@ dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port, dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); } -static void -dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids, - uint8_t exclude_port_id, int16_t *port_loads, - uint8_t *target_port_id, int16_t *target_load) -{ - int16_t candidate_port_id = -1; - int16_t candidate_load = DSW_MAX_LOAD; - uint16_t i; - - for (i = 0; i < num_port_ids; i++) { - uint8_t port_id = port_ids[i]; - if (port_id != exclude_port_id) { - int16_t load = port_loads[port_id]; - if (candidate_port_id == -1 || - load < candidate_load) { - candidate_port_id = port_id; - candidate_load = load; - } - } - } - *target_port_id = candidate_port_id; - *target_load = candidate_load; -} - struct dsw_queue_flow_burst { struct dsw_queue_flow queue_flow; uint16_t count; }; -static inline int -dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b) -{ - const struct dsw_queue_flow_burst *burst_a = v_burst_a; - const struct dsw_queue_flow_burst *burst_b = v_burst_b; - - int a_count = burst_a->count; - int b_count = burst_b->count; - - return a_count - b_count; -} - #define DSW_QF_TO_INT(_qf) \ ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash))) @@ -363,8 +351,6 @@ dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len, current_burst->count++; } - qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst); - return num_bursts; } @@ -384,44 +370,158 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads, return below_limit; } +static int16_t +dsw_flow_load(uint16_t num_events, int16_t port_load) +{ + return ((int32_t)port_load * (int32_t)num_events) / + DSW_MAX_EVENTS_RECORDED; +} + +static int16_t +dsw_evaluate_migration(int16_t source_load, int16_t target_load, + int16_t flow_load) +{ + int32_t res_target_load; + int32_t imbalance; + + if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION) + return -1; + + imbalance = source_load - target_load; + + if (imbalance < DSW_REBALANCE_THRESHOLD) + return -1; + + res_target_load = target_load + flow_load; + + /* If the estimated load of the target port will be higher + * than the source port's load, it doesn't make sense to move + * the flow. + */ + if (res_target_load > source_load) + return -1; + + /* The more idle the target will be, the better. This will + * make migration prefer moving smaller flows, and flows to + * lightly loaded ports. + */ + return DSW_MAX_LOAD - res_target_load; +} + +static bool +dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id) +{ + struct dsw_queue *queue = &dsw->queues[queue_id]; + uint16_t i; + + for (i = 0; i < queue->num_serving_ports; i++) + if (queue->serving_ports[i] == port_id) + return true; + + return false; +} + static bool dsw_select_emigration_target(struct dsw_evdev *dsw, - struct dsw_port *source_port, - struct dsw_queue_flow_burst *bursts, - uint16_t num_bursts, int16_t *port_loads, - int16_t max_load, struct dsw_queue_flow *target_qf, - uint8_t *target_port_id) + struct dsw_queue_flow_burst *bursts, + uint16_t num_bursts, uint8_t source_port_id, + int16_t *port_loads, uint16_t num_ports, + uint8_t *target_port_ids, + struct dsw_queue_flow *target_qfs, + uint8_t *targets_len) { - uint16_t source_load = port_loads[source_port->id]; + int16_t source_port_load = port_loads[source_port_id]; + struct dsw_queue_flow *candidate_qf; + uint8_t candidate_port_id; + int16_t candidate_weight = -1; + int16_t candidate_flow_load; uint16_t i; + if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) + return false; + for (i = 0; i < num_bursts; i++) { - struct dsw_queue_flow *qf = &bursts[i].queue_flow; + struct dsw_queue_flow_burst *burst = &bursts[i]; + struct dsw_queue_flow *qf = &burst->queue_flow; + int16_t flow_load; + uint16_t port_id; - if (dsw_port_is_flow_paused(source_port, qf->queue_id, - qf->flow_hash)) + if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len, + qf->queue_id, qf->flow_hash)) continue; - struct dsw_queue *queue = &dsw->queues[qf->queue_id]; - int16_t target_load; + flow_load = dsw_flow_load(burst->count, source_port_load); - dsw_find_lowest_load_port(queue->serving_ports, - queue->num_serving_ports, - source_port->id, port_loads, - target_port_id, &target_load); + for (port_id = 0; port_id < num_ports; port_id++) { + int16_t weight; - if (target_load < source_load && - target_load < max_load) { - *target_qf = *qf; - return true; + if (port_id == source_port_id) + continue; + + if (!dsw_is_serving_port(dsw, port_id, qf->queue_id)) + continue; + + weight = dsw_evaluate_migration(source_port_load, + port_loads[port_id], + flow_load); + + if (weight > candidate_weight) { + candidate_qf = qf; + candidate_port_id = port_id; + candidate_weight = weight; + candidate_flow_load = flow_load; + } } } - DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, " - "no target port found with load less than %d.\n", - num_bursts, DSW_LOAD_TO_PERCENT(max_load)); + if (candidate_weight < 0) + return false; - return false; + DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d " + "flow_hash %d (with flow load %d) for migration " + "to port %d.\n", candidate_qf->queue_id, + candidate_qf->flow_hash, + DSW_LOAD_TO_PERCENT(candidate_flow_load), + candidate_port_id); + + port_loads[candidate_port_id] += candidate_flow_load; + port_loads[source_port_id] -= candidate_flow_load; + + target_port_ids[*targets_len] = candidate_port_id; + target_qfs[*targets_len] = *candidate_qf; + (*targets_len)++; + + return true; +} + +static void +dsw_select_emigration_targets(struct dsw_evdev *dsw, + struct dsw_port *source_port, + struct dsw_queue_flow_burst *bursts, + uint16_t num_bursts, int16_t *port_loads) +{ + struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs; + uint8_t *target_port_ids = source_port->emigration_target_port_ids; + uint8_t *targets_len = &source_port->emigration_targets_len; + uint8_t i; + + for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) { + bool found; + + found = dsw_select_emigration_target(dsw, bursts, num_bursts, + source_port->id, + port_loads, dsw->num_ports, + target_port_ids, + target_qfs, + targets_len); + if (!found) + break; + } + + if (*targets_len == 0) + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "For the %d flows considered, no target port " + "was found.\n", num_bursts); } static uint8_t @@ -562,7 +662,7 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, static void dsw_port_flush_paused_events(struct dsw_evdev *dsw, struct dsw_port *source_port, - uint8_t queue_id, uint16_t paused_flow_hash) + const struct dsw_queue_flow *qf) { uint16_t paused_events_len = source_port->paused_events_len; struct rte_event paused_events[paused_events_len]; @@ -572,7 +672,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, if (paused_events_len == 0) return; - if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash)) + if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash)) return; rte_memcpy(paused_events, source_port->paused_events, @@ -580,7 +680,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, source_port->paused_events_len = 0; - dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash); + dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash); for (i = 0; i < paused_events_len; i++) { struct rte_event *event = &paused_events[i]; @@ -588,8 +688,8 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, flow_hash = dsw_flow_id_hash(event->flow_id); - if (event->queue_id == queue_id && - flow_hash == paused_flow_hash) + if (event->queue_id == qf->queue_id && + flow_hash == qf->flow_hash) dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event); else @@ -598,33 +698,94 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, } static void -dsw_port_emigration_stats(struct dsw_port *port) +dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished) { - uint64_t emigration_latency; + uint64_t flow_migration_latency; - emigration_latency = (rte_get_timer_cycles() - port->emigration_start); - port->emigration_latency += emigration_latency; - port->emigrations++; + flow_migration_latency = + (rte_get_timer_cycles() - port->emigration_start); + port->emigration_latency += (flow_migration_latency * finished); + port->emigrations += finished; } static void -dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port) +dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t schedule_type) { - uint8_t queue_id = port->emigration_target_qf.queue_id; - uint16_t flow_hash = port->emigration_target_qf.flow_hash; + uint8_t i; + struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION]; + uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION]; + uint8_t left_qfs_len = 0; + uint8_t finished; + + for (i = 0; i < port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = &port->emigration_target_qfs[i]; + uint8_t queue_id = qf->queue_id; + uint8_t queue_schedule_type = + dsw->queues[queue_id].schedule_type; + uint16_t flow_hash = qf->flow_hash; + + if (queue_schedule_type != schedule_type) { + left_port_ids[left_qfs_len] = + port->emigration_target_port_ids[i]; + left_qfs[left_qfs_len] = *qf; + left_qfs_len++; + continue; + } + + DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for " + "queue_id %d flow_hash %d.\n", queue_id, + flow_hash); + + if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) { + dsw_port_remove_paused_flow(port, qf); + dsw_port_flush_paused_events(dsw, port, qf); + } + } - port->migration_state = DSW_MIGRATION_STATE_IDLE; - port->seen_events_len = 0; + finished = port->emigration_targets_len - left_qfs_len; - dsw_port_emigration_stats(port); + if (finished > 0) + dsw_port_emigration_stats(port, finished); - if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) { - dsw_port_remove_paused_flow(port, queue_id, flow_hash); - dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash); + for (i = 0; i < left_qfs_len; i++) { + port->emigration_target_port_ids[i] = left_port_ids[i]; + port->emigration_target_qfs[i] = left_qfs[i]; } + port->emigration_targets_len = left_qfs_len; - DSW_LOG_DP_PORT(DEBUG, port->id, "Emigration completed for queue_id " - "%d flow_hash %d.\n", queue_id, flow_hash); + if (port->emigration_targets_len == 0) { + port->migration_state = DSW_MIGRATION_STATE_IDLE; + port->seen_events_len = 0; + } +} + +static void +dsw_port_move_parallel_flows(struct dsw_evdev *dsw, + struct dsw_port *source_port) +{ + uint8_t i; + + for (i = 0; i < source_port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = + &source_port->emigration_target_qfs[i]; + uint8_t queue_id = qf->queue_id; + + if (dsw->queues[queue_id].schedule_type == + RTE_SCHED_TYPE_PARALLEL) { + uint8_t dest_port_id = + source_port->emigration_target_port_ids[i]; + uint16_t flow_hash = qf->flow_hash; + + /* Single byte-sized stores are always atomic. */ + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + dest_port_id; + } + } + + rte_smp_wmb(); + + dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL); } static void @@ -675,9 +836,9 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, source_port_load = rte_atomic16_read(&source_port->load); if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) { DSW_LOG_DP_PORT(DEBUG, source_port->id, - "Load %d is below threshold level %d.\n", - DSW_LOAD_TO_PERCENT(source_port_load), - DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); + "Load %d is below threshold level %d.\n", + DSW_LOAD_TO_PERCENT(source_port_load), + DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); return; } @@ -694,16 +855,9 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, return; } - /* Sort flows into 'bursts' to allow attempting to migrating - * small (but still active) flows first - this it to avoid - * having large flows moving around the worker cores too much - * (to avoid cache misses, among other things). Of course, the - * number of recorded events (queue+flow ids) are limited, and - * provides only a snapshot, so only so many conclusions can - * be drawn from this data. - */ num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len, bursts); + /* For non-big-little systems, there's no point in moving the * only (known) flow. */ @@ -715,33 +869,11 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, return; } - /* The strategy is to first try to find a flow to move to a - * port with low load (below the emigration-attempt - * threshold). If that fails, we try to find a port which is - * below the max threshold, and also less loaded than this - * port is. - */ - if (!dsw_select_emigration_target(dsw, source_port, bursts, num_bursts, - port_loads, - DSW_MIN_SOURCE_LOAD_FOR_MIGRATION, - &source_port->emigration_target_qf, - &source_port->emigration_target_port_id) - && - !dsw_select_emigration_target(dsw, source_port, bursts, num_bursts, - port_loads, - DSW_MAX_TARGET_LOAD_FOR_MIGRATION, - &source_port->emigration_target_qf, - &source_port->emigration_target_port_id)) - return; - - DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d " - "flow_hash %d from port %d to port %d.\n", - source_port->emigration_target_qf.queue_id, - source_port->emigration_target_qf.flow_hash, - source_port->id, - source_port->emigration_target_port_id); + dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts, + port_loads); - /* We have a winner. */ + if (source_port->emigration_targets_len == 0) + return; source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; source_port->emigration_start = rte_get_timer_cycles(); @@ -750,71 +882,58 @@ dsw_port_consider_emigration(struct dsw_evdev *dsw, * parallel queues, since atomic/ordered semantics need not to * be maintained. */ + dsw_port_move_parallel_flows(dsw, source_port); - if (dsw->queues[source_port->emigration_target_qf.queue_id]. - schedule_type == RTE_SCHED_TYPE_PARALLEL) { - uint8_t queue_id = - source_port->emigration_target_qf.queue_id; - uint16_t flow_hash = - source_port->emigration_target_qf.flow_hash; - uint8_t dest_port_id = - source_port->emigration_target_port_id; - - /* Single byte-sized stores are always atomic. */ - dsw->queues[queue_id].flow_to_port_map[flow_hash] = - dest_port_id; - rte_smp_wmb(); - - dsw_port_end_emigration(dsw, source_port); - + /* All flows were on PARALLEL queues. */ + if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE) return; - } /* There might be 'loopback' events already scheduled in the * output buffers. */ dsw_port_flush_out_buffers(dsw, source_port); - dsw_port_add_paused_flow(source_port, - source_port->emigration_target_qf.queue_id, - source_port->emigration_target_qf.flow_hash); + dsw_port_add_paused_flows(source_port, + source_port->emigration_target_qfs, + source_port->emigration_targets_len); dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ, - source_port->emigration_target_qf.queue_id, - source_port->emigration_target_qf.flow_hash); + source_port->emigration_target_qfs, + source_port->emigration_targets_len); source_port->cfm_cnt = 0; } static void dsw_port_flush_paused_events(struct dsw_evdev *dsw, struct dsw_port *source_port, - uint8_t queue_id, uint16_t paused_flow_hash); + const struct dsw_queue_flow *qf); static void -dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port, - uint8_t originating_port_id, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, + struct dsw_queue_flow *paused_qfs, + uint8_t qfs_len) { + uint16_t i; struct dsw_ctl_msg cfm = { .type = DSW_CTL_CFM, - .originating_port_id = port->id, - .queue_id = queue_id, - .flow_hash = paused_flow_hash + .originating_port_id = port->id }; - DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n", - queue_id, paused_flow_hash); - - dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash); + dsw_port_remove_paused_flows(port, paused_qfs, qfs_len); rte_smp_rmb(); - if (dsw_schedule(dsw, queue_id, paused_flow_hash) == port->id) - port->immigrations++; - dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); - dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash); + for (i = 0; i < qfs_len; i++) { + struct dsw_queue_flow *qf = &paused_qfs[i]; + + if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id) + port->immigrations++; + + dsw_port_flush_paused_events(dsw, port, qf); + } } #define FORWARD_BURST_SIZE (32) @@ -869,31 +988,37 @@ dsw_port_forward_emigrated_flow(struct dsw_port *source_port, } static void -dsw_port_move_migrating_flow(struct dsw_evdev *dsw, - struct dsw_port *source_port) +dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, + struct dsw_port *source_port) { - uint8_t queue_id = source_port->emigration_target_qf.queue_id; - uint16_t flow_hash = source_port->emigration_target_qf.flow_hash; - uint8_t dest_port_id = source_port->emigration_target_port_id; - struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + uint8_t i; dsw_port_flush_out_buffers(dsw, source_port); rte_smp_wmb(); - dsw->queues[queue_id].flow_to_port_map[flow_hash] = - dest_port_id; + for (i = 0; i < source_port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = + &source_port->emigration_target_qfs[i]; + uint8_t dest_port_id = + source_port->emigration_target_port_ids[i]; + struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + + dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] = + dest_port_id; - dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring, - queue_id, flow_hash); + dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring, + qf->queue_id, qf->flow_hash); + } /* Flow table update and migration destination port's enqueues * must be seen before the control message. */ rte_smp_wmb(); - dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id, - flow_hash); + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, + source_port->emigration_target_qfs, + source_port->emigration_targets_len); source_port->cfm_cnt = 0; source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING; } @@ -911,7 +1036,8 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) port->migration_state = DSW_MIGRATION_STATE_FORWARDING; break; case DSW_MIGRATION_STATE_UNPAUSING: - dsw_port_end_emigration(dsw, port); + dsw_port_end_emigration(dsw, port, + RTE_SCHED_TYPE_ATOMIC); break; default: RTE_ASSERT(0); @@ -933,15 +1059,14 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) if (dsw_port_ctl_dequeue(port, &msg) == 0) { switch (msg.type) { case DSW_CTL_PAUS_REQ: - dsw_port_handle_pause_flow(dsw, port, - msg.originating_port_id, - msg.queue_id, msg.flow_hash); + dsw_port_handle_pause_flows(dsw, port, + msg.originating_port_id, + msg.qfs, msg.qfs_len); break; case DSW_CTL_UNPAUS_REQ: - dsw_port_handle_unpause_flow(dsw, port, - msg.originating_port_id, - msg.queue_id, - msg.flow_hash); + dsw_port_handle_unpause_flows(dsw, port, + msg.originating_port_id, + msg.qfs, msg.qfs_len); break; case DSW_CTL_CFM: dsw_port_handle_confirm(dsw, port); @@ -964,7 +1089,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) { if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING && port->pending_releases == 0)) - dsw_port_move_migrating_flow(dsw, port); + dsw_port_move_emigrating_flows(dsw, port); /* Polling the control ring is relatively inexpensive, and * polling it often helps bringing down migration latency, so -- 2.20.1