X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=drivers%2Fevent%2Fdsw%2Fdsw_event.c;h=83239037362d1be99077eda264622c92b1c30b3d;hb=a2c6d3f34f9065a75fbbe70699610388e711ac6c;hp=eae53b2404b8bdd83f4c1cb81d7537854b34e00c;hpb=0c4155c7b5b51a60f676c1d7279bfcd1b14acd38;p=dpdk.git diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c index eae53b2404..8323903736 100644 --- a/drivers/event/dsw/dsw_event.c +++ b/drivers/event/dsw/dsw_event.c @@ -160,6 +160,11 @@ dsw_port_load_update(struct dsw_port *port, uint64_t now) (DSW_OLD_LOAD_WEIGHT+1); rte_atomic16_set(&port->load, new_load); + + /* The load of the recently immigrated flows should hopefully + * be reflected the load estimate by now. + */ + rte_atomic32_set(&port->immigration_load, 0); } static void @@ -176,83 +181,88 @@ dsw_port_consider_load_update(struct dsw_port *port, uint64_t now) static void dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg) { - void *raw_msg; - - memcpy(&raw_msg, msg, sizeof(*msg)); - /* there's always room on the ring */ - while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0) + while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0) rte_pause(); } static int dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg) { - void *raw_msg; - int rc; - - rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg); - - if (rc == 0) - memcpy(msg, &raw_msg, sizeof(*msg)); - - return rc; + return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg)); } static void dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port, - uint8_t type, uint8_t queue_id, uint16_t flow_hash) + uint8_t type, struct dsw_queue_flow *qfs, + uint8_t qfs_len) { uint16_t port_id; struct dsw_ctl_msg msg = { .type = type, .originating_port_id = source_port->id, - .queue_id = queue_id, - .flow_hash = flow_hash + .qfs_len = qfs_len }; + memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len); + for (port_id = 0; port_id < dsw->num_ports; port_id++) if (port_id != source_port->id) dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg); } -static bool -dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, - uint16_t flow_hash) +static __rte_always_inline bool +dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len, + uint8_t queue_id, uint16_t flow_hash) { uint16_t i; - for (i = 0; i < port->paused_flows_len; i++) { - struct dsw_queue_flow *qf = &port->paused_flows[i]; - if (qf->queue_id == queue_id && - qf->flow_hash == flow_hash) + for (i = 0; i < qfs_len; i++) + if (qfs[i].queue_id == queue_id && + qfs[i].flow_hash == flow_hash) return true; - } + return false; } +static __rte_always_inline bool +dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, + uint16_t flow_hash) +{ + return dsw_is_queue_flow_in_ary(port->paused_flows, + port->paused_flows_len, + queue_id, flow_hash); +} + static void -dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs, + uint8_t qfs_len) { - port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) { - .queue_id = queue_id, - .flow_hash = paused_flow_hash + uint8_t i; + + for (i = 0; i < qfs_len; i++) { + struct dsw_queue_flow *qf = &qfs[i]; + + DSW_LOG_DP_PORT(DEBUG, port->id, + "Pausing queue_id %d flow_hash %d.\n", + qf->queue_id, qf->flow_hash); + + port->paused_flows[port->paused_flows_len] = *qf; + port->paused_flows_len++; }; - port->paused_flows_len++; } static void -dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_remove_paused_flow(struct dsw_port *port, + struct dsw_queue_flow *target_qf) { uint16_t i; for (i = 0; i < port->paused_flows_len; i++) { struct dsw_queue_flow *qf = &port->paused_flows[i]; - if (qf->queue_id == queue_id && - qf->flow_hash == paused_flow_hash) { + if (qf->queue_id == target_qf->queue_id && + qf->flow_hash == target_qf->flow_hash) { uint16_t last_idx = port->paused_flows_len-1; if (i != last_idx) port->paused_flows[i] = @@ -263,30 +273,37 @@ dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id, } } +static void +dsw_port_remove_paused_flows(struct dsw_port *port, + struct dsw_queue_flow *qfs, uint8_t qfs_len) +{ + uint8_t i; + + for (i = 0; i < qfs_len; i++) + dsw_port_remove_paused_flow(port, &qfs[i]); + +} + static void dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port); static void -dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port, - uint8_t originating_port_id, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, + struct dsw_queue_flow *paused_qfs, + uint8_t qfs_len) { struct dsw_ctl_msg cfm = { .type = DSW_CTL_CFM, - .originating_port_id = port->id, - .queue_id = queue_id, - .flow_hash = paused_flow_hash + .originating_port_id = port->id }; - DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n", - queue_id, paused_flow_hash); - /* There might be already-scheduled events belonging to the * paused flow in the output buffers. */ dsw_port_flush_out_buffers(dsw, port); - dsw_port_add_paused_flow(port, queue_id, paused_flow_hash); + dsw_port_add_paused_flows(port, paused_qfs, qfs_len); /* Make sure any stores to the original port's in_ring is seen * before the ctl message. @@ -296,47 +313,11 @@ dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port, dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); } -static void -dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids, - uint8_t exclude_port_id, int16_t *port_loads, - uint8_t *target_port_id, int16_t *target_load) -{ - int16_t candidate_port_id = -1; - int16_t candidate_load = DSW_MAX_LOAD; - uint16_t i; - - for (i = 0; i < num_port_ids; i++) { - uint8_t port_id = port_ids[i]; - if (port_id != exclude_port_id) { - int16_t load = port_loads[port_id]; - if (candidate_port_id == -1 || - load < candidate_load) { - candidate_port_id = port_id; - candidate_load = load; - } - } - } - *target_port_id = candidate_port_id; - *target_load = candidate_load; -} - struct dsw_queue_flow_burst { struct dsw_queue_flow queue_flow; uint16_t count; }; -static inline int -dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b) -{ - const struct dsw_queue_flow_burst *burst_a = v_burst_a; - const struct dsw_queue_flow_burst *burst_b = v_burst_b; - - int a_count = burst_a->count; - int b_count = burst_b->count; - - return a_count - b_count; -} - #define DSW_QF_TO_INT(_qf) \ ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash))) @@ -375,8 +356,6 @@ dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len, current_burst->count++; } - qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst); - return num_bursts; } @@ -388,7 +367,13 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads, uint16_t i; for (i = 0; i < dsw->num_ports; i++) { - int16_t load = rte_atomic16_read(&dsw->ports[i].load); + int16_t measured_load = rte_atomic16_read(&dsw->ports[i].load); + int32_t immigration_load = + rte_atomic32_read(&dsw->ports[i].immigration_load); + int32_t load = measured_load + immigration_load; + + load = RTE_MIN(load, DSW_MAX_LOAD); + if (load < load_limit) below_limit = true; port_loads[i] = load; @@ -396,44 +381,161 @@ dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads, return below_limit; } +static int16_t +dsw_flow_load(uint16_t num_events, int16_t port_load) +{ + return ((int32_t)port_load * (int32_t)num_events) / + DSW_MAX_EVENTS_RECORDED; +} + +static int16_t +dsw_evaluate_migration(int16_t source_load, int16_t target_load, + int16_t flow_load) +{ + int32_t res_target_load; + int32_t imbalance; + + if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION) + return -1; + + imbalance = source_load - target_load; + + if (imbalance < DSW_REBALANCE_THRESHOLD) + return -1; + + res_target_load = target_load + flow_load; + + /* If the estimated load of the target port will be higher + * than the source port's load, it doesn't make sense to move + * the flow. + */ + if (res_target_load > source_load) + return -1; + + /* The more idle the target will be, the better. This will + * make migration prefer moving smaller flows, and flows to + * lightly loaded ports. + */ + return DSW_MAX_LOAD - res_target_load; +} + +static bool +dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id) +{ + struct dsw_queue *queue = &dsw->queues[queue_id]; + uint16_t i; + + for (i = 0; i < queue->num_serving_ports; i++) + if (queue->serving_ports[i] == port_id) + return true; + + return false; +} + static bool -dsw_select_migration_target(struct dsw_evdev *dsw, - struct dsw_port *source_port, +dsw_select_emigration_target(struct dsw_evdev *dsw, struct dsw_queue_flow_burst *bursts, - uint16_t num_bursts, int16_t *port_loads, - int16_t max_load, struct dsw_queue_flow *target_qf, - uint8_t *target_port_id) + uint16_t num_bursts, uint8_t source_port_id, + int16_t *port_loads, uint16_t num_ports, + uint8_t *target_port_ids, + struct dsw_queue_flow *target_qfs, + uint8_t *targets_len) { - uint16_t source_load = port_loads[source_port->id]; + int16_t source_port_load = port_loads[source_port_id]; + struct dsw_queue_flow *candidate_qf = NULL; + uint8_t candidate_port_id = 0; + int16_t candidate_weight = -1; + int16_t candidate_flow_load = -1; uint16_t i; + if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) + return false; + for (i = 0; i < num_bursts; i++) { - struct dsw_queue_flow *qf = &bursts[i].queue_flow; + struct dsw_queue_flow_burst *burst = &bursts[i]; + struct dsw_queue_flow *qf = &burst->queue_flow; + int16_t flow_load; + uint16_t port_id; - if (dsw_port_is_flow_paused(source_port, qf->queue_id, - qf->flow_hash)) + if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len, + qf->queue_id, qf->flow_hash)) continue; - struct dsw_queue *queue = &dsw->queues[qf->queue_id]; - int16_t target_load; + flow_load = dsw_flow_load(burst->count, source_port_load); - dsw_find_lowest_load_port(queue->serving_ports, - queue->num_serving_ports, - source_port->id, port_loads, - target_port_id, &target_load); + for (port_id = 0; port_id < num_ports; port_id++) { + int16_t weight; - if (target_load < source_load && - target_load < max_load) { - *target_qf = *qf; - return true; + if (port_id == source_port_id) + continue; + + if (!dsw_is_serving_port(dsw, port_id, qf->queue_id)) + continue; + + weight = dsw_evaluate_migration(source_port_load, + port_loads[port_id], + flow_load); + + if (weight > candidate_weight) { + candidate_qf = qf; + candidate_port_id = port_id; + candidate_weight = weight; + candidate_flow_load = flow_load; + } } } - DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, " - "no target port found with load less than %d.\n", - num_bursts, DSW_LOAD_TO_PERCENT(max_load)); + if (candidate_weight < 0) + return false; - return false; + DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d " + "flow_hash %d (with flow load %d) for migration " + "to port %d.\n", candidate_qf->queue_id, + candidate_qf->flow_hash, + DSW_LOAD_TO_PERCENT(candidate_flow_load), + candidate_port_id); + + port_loads[candidate_port_id] += candidate_flow_load; + port_loads[source_port_id] -= candidate_flow_load; + + target_port_ids[*targets_len] = candidate_port_id; + target_qfs[*targets_len] = *candidate_qf; + (*targets_len)++; + + rte_atomic32_add(&dsw->ports[candidate_port_id].immigration_load, + candidate_flow_load); + + return true; +} + +static void +dsw_select_emigration_targets(struct dsw_evdev *dsw, + struct dsw_port *source_port, + struct dsw_queue_flow_burst *bursts, + uint16_t num_bursts, int16_t *port_loads) +{ + struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs; + uint8_t *target_port_ids = source_port->emigration_target_port_ids; + uint8_t *targets_len = &source_port->emigration_targets_len; + uint16_t i; + + for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) { + bool found; + + found = dsw_select_emigration_target(dsw, bursts, num_bursts, + source_port->id, + port_loads, dsw->num_ports, + target_port_ids, + target_qfs, + targets_len); + if (!found) + break; + } + + if (*targets_len == 0) + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "For the %d flows considered, no target port " + "was found.\n", num_bursts); } static uint8_t @@ -574,7 +676,7 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, static void dsw_port_flush_paused_events(struct dsw_evdev *dsw, struct dsw_port *source_port, - uint8_t queue_id, uint16_t paused_flow_hash) + const struct dsw_queue_flow *qf) { uint16_t paused_events_len = source_port->paused_events_len; struct rte_event paused_events[paused_events_len]; @@ -584,7 +686,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, if (paused_events_len == 0) return; - if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash)) + if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash)) return; rte_memcpy(paused_events, source_port->paused_events, @@ -592,7 +694,7 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, source_port->paused_events_len = 0; - dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash); + dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash); for (i = 0; i < paused_events_len; i++) { struct rte_event *event = &paused_events[i]; @@ -600,8 +702,8 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, flow_hash = dsw_flow_id_hash(event->flow_id); - if (event->queue_id == queue_id && - flow_hash == paused_flow_hash) + if (event->queue_id == qf->queue_id && + flow_hash == qf->flow_hash) dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event); else @@ -610,39 +712,100 @@ dsw_port_flush_paused_events(struct dsw_evdev *dsw, } static void -dsw_port_migration_stats(struct dsw_port *port) +dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished) { - uint64_t migration_latency; + uint64_t flow_migration_latency; - migration_latency = (rte_get_timer_cycles() - port->migration_start); - port->migration_latency += migration_latency; - port->migrations++; + flow_migration_latency = + (rte_get_timer_cycles() - port->emigration_start); + port->emigration_latency += (flow_migration_latency * finished); + port->emigrations += finished; } static void -dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port) +dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t schedule_type) { - uint8_t queue_id = port->migration_target_qf.queue_id; - uint16_t flow_hash = port->migration_target_qf.flow_hash; + uint8_t i; + struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION]; + uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION]; + uint8_t left_qfs_len = 0; + uint8_t finished; + + for (i = 0; i < port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = &port->emigration_target_qfs[i]; + uint8_t queue_id = qf->queue_id; + uint8_t queue_schedule_type = + dsw->queues[queue_id].schedule_type; + uint16_t flow_hash = qf->flow_hash; + + if (queue_schedule_type != schedule_type) { + left_port_ids[left_qfs_len] = + port->emigration_target_port_ids[i]; + left_qfs[left_qfs_len] = *qf; + left_qfs_len++; + continue; + } - port->migration_state = DSW_MIGRATION_STATE_IDLE; - port->seen_events_len = 0; + DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for " + "queue_id %d flow_hash %d.\n", queue_id, + flow_hash); - dsw_port_migration_stats(port); + if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) { + dsw_port_remove_paused_flow(port, qf); + dsw_port_flush_paused_events(dsw, port, qf); + } + } + + finished = port->emigration_targets_len - left_qfs_len; + + if (finished > 0) + dsw_port_emigration_stats(port, finished); + + for (i = 0; i < left_qfs_len; i++) { + port->emigration_target_port_ids[i] = left_port_ids[i]; + port->emigration_target_qfs[i] = left_qfs[i]; + } + port->emigration_targets_len = left_qfs_len; + + if (port->emigration_targets_len == 0) { + port->migration_state = DSW_MIGRATION_STATE_IDLE; + port->seen_events_len = 0; + } +} - if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) { - dsw_port_remove_paused_flow(port, queue_id, flow_hash); - dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash); +static void +dsw_port_move_parallel_flows(struct dsw_evdev *dsw, + struct dsw_port *source_port) +{ + uint8_t i; + + for (i = 0; i < source_port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = + &source_port->emigration_target_qfs[i]; + uint8_t queue_id = qf->queue_id; + + if (dsw->queues[queue_id].schedule_type == + RTE_SCHED_TYPE_PARALLEL) { + uint8_t dest_port_id = + source_port->emigration_target_port_ids[i]; + uint16_t flow_hash = qf->flow_hash; + + /* Single byte-sized stores are always atomic. */ + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + dest_port_id; + } } - DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id " - "%d flow_hash %d.\n", queue_id, flow_hash); + rte_smp_wmb(); + + dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL); } static void -dsw_port_consider_migration(struct dsw_evdev *dsw, - struct dsw_port *source_port, - uint64_t now) +dsw_port_consider_emigration(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint64_t now) { bool any_port_below_limit; struct dsw_queue_flow *seen_events = source_port->seen_events; @@ -652,31 +815,31 @@ dsw_port_consider_migration(struct dsw_evdev *dsw, int16_t source_port_load; int16_t port_loads[dsw->num_ports]; - if (now < source_port->next_migration) + if (now < source_port->next_emigration) return; if (dsw->num_ports == 1) return; - DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n"); + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n"); /* Randomize interval to avoid having all threads considering - * migration at the same in point in time, which might lead to - * all choosing the same target port. + * emigration at the same in point in time, which might lead + * to all choosing the same target port. */ - source_port->next_migration = now + + source_port->next_emigration = now + source_port->migration_interval / 2 + rte_rand() % source_port->migration_interval; if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) { DSW_LOG_DP_PORT(DEBUG, source_port->id, - "Migration already in progress.\n"); + "Emigration already in progress.\n"); return; } /* For simplicity, avoid migration in the unlikely case there * is still events to consume in the in_buffer (from the last - * migration). + * emigration). */ if (source_port->in_buffer_len > 0) { DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still " @@ -687,9 +850,9 @@ dsw_port_consider_migration(struct dsw_evdev *dsw, source_port_load = rte_atomic16_read(&source_port->load); if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) { DSW_LOG_DP_PORT(DEBUG, source_port->id, - "Load %d is below threshold level %d.\n", - DSW_LOAD_TO_PERCENT(source_port_load), - DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); + "Load %d is below threshold level %d.\n", + DSW_LOAD_TO_PERCENT(source_port_load), + DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); return; } @@ -706,16 +869,9 @@ dsw_port_consider_migration(struct dsw_evdev *dsw, return; } - /* Sort flows into 'bursts' to allow attempting to migrating - * small (but still active) flows first - this it to avoid - * having large flows moving around the worker cores too much - * (to avoid cache misses, among other things). Of course, the - * number of recorded events (queue+flow ids) are limited, and - * provides only a snapshot, so only so many conclusions can - * be drawn from this data. - */ num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len, bursts); + /* For non-big-little systems, there's no point in moving the * only (known) flow. */ @@ -727,108 +883,80 @@ dsw_port_consider_migration(struct dsw_evdev *dsw, return; } - /* The strategy is to first try to find a flow to move to a - * port with low load (below the migration-attempt - * threshold). If that fails, we try to find a port which is - * below the max threshold, and also less loaded than this - * port is. - */ - if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts, - port_loads, - DSW_MIN_SOURCE_LOAD_FOR_MIGRATION, - &source_port->migration_target_qf, - &source_port->migration_target_port_id) - && - !dsw_select_migration_target(dsw, source_port, bursts, num_bursts, - port_loads, - DSW_MAX_TARGET_LOAD_FOR_MIGRATION, - &source_port->migration_target_qf, - &source_port->migration_target_port_id)) - return; - - DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d " - "flow_hash %d from port %d to port %d.\n", - source_port->migration_target_qf.queue_id, - source_port->migration_target_qf.flow_hash, - source_port->id, source_port->migration_target_port_id); + dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts, + port_loads); - /* We have a winner. */ + if (source_port->emigration_targets_len == 0) + return; source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; - source_port->migration_start = rte_get_timer_cycles(); + source_port->emigration_start = rte_get_timer_cycles(); /* No need to go through the whole pause procedure for * parallel queues, since atomic/ordered semantics need not to * be maintained. */ + dsw_port_move_parallel_flows(dsw, source_port); - if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type - == RTE_SCHED_TYPE_PARALLEL) { - uint8_t queue_id = source_port->migration_target_qf.queue_id; - uint16_t flow_hash = source_port->migration_target_qf.flow_hash; - uint8_t dest_port_id = source_port->migration_target_port_id; - - /* Single byte-sized stores are always atomic. */ - dsw->queues[queue_id].flow_to_port_map[flow_hash] = - dest_port_id; - rte_smp_wmb(); - - dsw_port_end_migration(dsw, source_port); - + /* All flows were on PARALLEL queues. */ + if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE) return; - } /* There might be 'loopback' events already scheduled in the * output buffers. */ dsw_port_flush_out_buffers(dsw, source_port); - dsw_port_add_paused_flow(source_port, - source_port->migration_target_qf.queue_id, - source_port->migration_target_qf.flow_hash); + dsw_port_add_paused_flows(source_port, + source_port->emigration_target_qfs, + source_port->emigration_targets_len); dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ, - source_port->migration_target_qf.queue_id, - source_port->migration_target_qf.flow_hash); + source_port->emigration_target_qfs, + source_port->emigration_targets_len); source_port->cfm_cnt = 0; } static void dsw_port_flush_paused_events(struct dsw_evdev *dsw, struct dsw_port *source_port, - uint8_t queue_id, uint16_t paused_flow_hash); + const struct dsw_queue_flow *qf); static void -dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port, - uint8_t originating_port_id, uint8_t queue_id, - uint16_t paused_flow_hash) +dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, + struct dsw_queue_flow *paused_qfs, + uint8_t qfs_len) { + uint16_t i; struct dsw_ctl_msg cfm = { .type = DSW_CTL_CFM, - .originating_port_id = port->id, - .queue_id = queue_id, - .flow_hash = paused_flow_hash + .originating_port_id = port->id }; - DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n", - queue_id, paused_flow_hash); - - dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash); + dsw_port_remove_paused_flows(port, paused_qfs, qfs_len); rte_smp_rmb(); dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); - dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash); + for (i = 0; i < qfs_len; i++) { + struct dsw_queue_flow *qf = &paused_qfs[i]; + + if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id) + port->immigrations++; + + dsw_port_flush_paused_events(dsw, port, qf); + } } #define FORWARD_BURST_SIZE (32) static void -dsw_port_forward_migrated_flow(struct dsw_port *source_port, - struct rte_event_ring *dest_ring, - uint8_t queue_id, - uint16_t flow_hash) +dsw_port_forward_emigrated_flow(struct dsw_port *source_port, + struct rte_event_ring *dest_ring, + uint8_t queue_id, + uint16_t flow_hash) { uint16_t events_left; @@ -874,31 +1002,37 @@ dsw_port_forward_migrated_flow(struct dsw_port *source_port, } static void -dsw_port_move_migrating_flow(struct dsw_evdev *dsw, - struct dsw_port *source_port) +dsw_port_move_emigrating_flows(struct dsw_evdev *dsw, + struct dsw_port *source_port) { - uint8_t queue_id = source_port->migration_target_qf.queue_id; - uint16_t flow_hash = source_port->migration_target_qf.flow_hash; - uint8_t dest_port_id = source_port->migration_target_port_id; - struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + uint8_t i; dsw_port_flush_out_buffers(dsw, source_port); rte_smp_wmb(); - dsw->queues[queue_id].flow_to_port_map[flow_hash] = - dest_port_id; + for (i = 0; i < source_port->emigration_targets_len; i++) { + struct dsw_queue_flow *qf = + &source_port->emigration_target_qfs[i]; + uint8_t dest_port_id = + source_port->emigration_target_port_ids[i]; + struct dsw_port *dest_port = &dsw->ports[dest_port_id]; - dsw_port_forward_migrated_flow(source_port, dest_port->in_ring, - queue_id, flow_hash); + dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] = + dest_port_id; + + dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring, + qf->queue_id, qf->flow_hash); + } /* Flow table update and migration destination port's enqueues * must be seen before the control message. */ rte_smp_wmb(); - dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id, - flow_hash); + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, + source_port->emigration_target_qfs, + source_port->emigration_targets_len); source_port->cfm_cnt = 0; source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING; } @@ -916,7 +1050,8 @@ dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) port->migration_state = DSW_MIGRATION_STATE_FORWARDING; break; case DSW_MIGRATION_STATE_UNPAUSING: - dsw_port_end_migration(dsw, port); + dsw_port_end_emigration(dsw, port, + RTE_SCHED_TYPE_ATOMIC); break; default: RTE_ASSERT(0); @@ -930,23 +1065,17 @@ dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) { struct dsw_ctl_msg msg; - /* So any table loads happens before the ring dequeue, in the - * case of a 'paus' message. - */ - rte_smp_rmb(); - if (dsw_port_ctl_dequeue(port, &msg) == 0) { switch (msg.type) { case DSW_CTL_PAUS_REQ: - dsw_port_handle_pause_flow(dsw, port, - msg.originating_port_id, - msg.queue_id, msg.flow_hash); + dsw_port_handle_pause_flows(dsw, port, + msg.originating_port_id, + msg.qfs, msg.qfs_len); break; case DSW_CTL_UNPAUS_REQ: - dsw_port_handle_unpause_flow(dsw, port, - msg.originating_port_id, - msg.queue_id, - msg.flow_hash); + dsw_port_handle_unpause_flows(dsw, port, + msg.originating_port_id, + msg.qfs, msg.qfs_len); break; case DSW_CTL_CFM: dsw_port_handle_confirm(dsw, port); @@ -969,7 +1098,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) { if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING && port->pending_releases == 0)) - dsw_port_move_migrating_flow(dsw, port); + dsw_port_move_emigrating_flows(dsw, port); /* Polling the control ring is relatively inexpensive, and * polling it often helps bringing down migration latency, so @@ -996,7 +1125,7 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) dsw_port_consider_load_update(port, now); - dsw_port_consider_migration(dsw, port, now); + dsw_port_consider_emigration(dsw, port, now); port->ops_since_bg_task = 0; } @@ -1194,11 +1323,6 @@ static uint16_t dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, uint16_t num) { - struct dsw_port *source_port = port; - struct dsw_evdev *dsw = source_port->dsw; - - dsw_port_ctl_process(dsw, source_port); - if (unlikely(port->in_buffer_len > 0)) { uint16_t dequeued = RTE_MIN(num, port->in_buffer_len); @@ -1254,11 +1378,11 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, * seem to improve performance. */ dsw_port_record_seen_events(port, events, dequeued); - } - /* XXX: Assuming the port can't produce any more work, - * consider flushing the output buffer, on dequeued == - * 0. - */ + } else /* Zero-size dequeue means a likely idle port, and thus + * we can afford trading some efficiency for a slightly + * reduced event wall-time latency. + */ + dsw_port_flush_out_buffers(dsw, port); #ifdef DSW_SORT_DEQUEUED dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);