(DSW_OLD_LOAD_WEIGHT+1);
rte_atomic16_set(&port->load, new_load);
+
+ /* The load of the recently immigrated flows should hopefully
+ * be reflected the load estimate by now.
+ */
+ rte_atomic32_set(&port->immigration_load, 0);
}
static void
uint16_t i;
for (i = 0; i < dsw->num_ports; i++) {
- int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+ int16_t measured_load = rte_atomic16_read(&dsw->ports[i].load);
+ int32_t immigration_load =
+ rte_atomic32_read(&dsw->ports[i].immigration_load);
+ int32_t load = measured_load + immigration_load;
+
+ load = RTE_MIN(load, DSW_MAX_LOAD);
+
if (load < load_limit)
below_limit = true;
port_loads[i] = load;
uint8_t *targets_len)
{
int16_t source_port_load = port_loads[source_port_id];
- struct dsw_queue_flow *candidate_qf;
- uint8_t candidate_port_id;
+ struct dsw_queue_flow *candidate_qf = NULL;
+ uint8_t candidate_port_id = 0;
int16_t candidate_weight = -1;
- int16_t candidate_flow_load;
+ int16_t candidate_flow_load = -1;
uint16_t i;
if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
target_qfs[*targets_len] = *candidate_qf;
(*targets_len)++;
+ rte_atomic32_add(&dsw->ports[candidate_port_id].immigration_load,
+ candidate_flow_load);
+
return true;
}
struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
uint8_t *target_port_ids = source_port->emigration_target_port_ids;
uint8_t *targets_len = &source_port->emigration_targets_len;
- uint8_t i;
+ uint16_t i;
for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
bool found;
if (dsw->num_ports == 1)
return;
+ if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
+ return;
+
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
/* Randomize interval to avoid having all threads considering
{
struct dsw_ctl_msg msg;
- /* So any table loads happens before the ring dequeue, in the
- * case of a 'paus' message.
- */
- rte_smp_rmb();
-
if (dsw_port_ctl_dequeue(port, &msg) == 0) {
switch (msg.type) {
case DSW_CTL_PAUS_REQ:
DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
"accepted.\n", num_non_release);
- return num_non_release;
+ return (num_non_release + num_release);
}
uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
{
- struct dsw_port *source_port = port;
- struct dsw_evdev *dsw = source_port->dsw;
-
- dsw_port_ctl_process(dsw, source_port);
-
if (unlikely(port->in_buffer_len > 0)) {
uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);