1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Ericsson AB
7 #ifdef DSW_SORT_DEQUEUED
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
23 int32_t inflight_credits = port->inflight_credits;
24 int32_t missing_credits = credits - inflight_credits;
25 int32_t total_on_loan;
27 int32_t acquired_credits;
28 int32_t new_total_on_loan;
30 if (likely(missing_credits <= 0)) {
31 port->inflight_credits -= credits;
35 total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36 available = dsw->max_inflight - total_on_loan;
37 acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
39 if (available < acquired_credits)
42 /* This is a race, no locks are involved, and thus some other
43 * thread can allocate tokens in between the check and the
46 new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
49 if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50 /* Some other port took the last credits */
51 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
55 DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
58 port->inflight_credits += acquired_credits;
59 port->inflight_credits -= credits;
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
68 port->inflight_credits += credits;
70 if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72 int32_t return_credits =
73 port->inflight_credits - leave_credits;
75 port->inflight_credits = leave_credits;
77 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
79 DSW_LOG_DP_PORT(DEBUG, port->id,
80 "Returned %d tokens to pool.\n",
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87 uint16_t num_forward, uint16_t num_release)
89 port->new_enqueued += num_new;
90 port->forward_enqueued += num_forward;
91 port->release_enqueued += num_release;
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
97 source_port->queue_enqueued[queue_id]++;
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
103 port->dequeued += num;
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
109 source_port->queue_dequeued[queue_id]++;
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
115 if (dequeued > 0 && port->busy_start == 0)
116 /* work period begins */
117 port->busy_start = rte_get_timer_cycles();
118 else if (dequeued == 0 && port->busy_start > 0) {
119 /* work period ends */
120 uint64_t work_period =
121 rte_get_timer_cycles() - port->busy_start;
122 port->busy_cycles += work_period;
123 port->busy_start = 0;
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
130 uint64_t passed = now - port->measurement_start;
131 uint64_t busy_cycles = port->busy_cycles;
133 if (port->busy_start > 0) {
134 busy_cycles += (now - port->busy_start);
135 port->busy_start = now;
138 int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
140 port->measurement_start = now;
141 port->busy_cycles = 0;
143 port->total_busy_cycles += busy_cycles;
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
155 old_load = rte_atomic16_read(&port->load);
157 period_load = dsw_port_load_close_period(port, now);
159 new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160 (DSW_OLD_LOAD_WEIGHT+1);
162 rte_atomic16_set(&port->load, new_load);
166 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
168 if (now < port->next_load_update)
171 port->next_load_update = now + port->load_update_interval;
173 dsw_port_load_update(port, now);
177 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
179 /* there's always room on the ring */
180 while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
185 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
187 return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
191 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
192 uint8_t type, struct dsw_queue_flow *qfs,
196 struct dsw_ctl_msg msg = {
198 .originating_port_id = source_port->id,
202 memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
204 for (port_id = 0; port_id < dsw->num_ports; port_id++)
205 if (port_id != source_port->id)
206 dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
209 static __rte_always_inline bool
210 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
211 uint8_t queue_id, uint16_t flow_hash)
215 for (i = 0; i < qfs_len; i++)
216 if (qfs[i].queue_id == queue_id &&
217 qfs[i].flow_hash == flow_hash)
223 static __rte_always_inline bool
224 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
227 return dsw_is_queue_flow_in_ary(port->paused_flows,
228 port->paused_flows_len,
229 queue_id, flow_hash);
233 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
238 for (i = 0; i < qfs_len; i++) {
239 struct dsw_queue_flow *qf = &qfs[i];
241 DSW_LOG_DP_PORT(DEBUG, port->id,
242 "Pausing queue_id %d flow_hash %d.\n",
243 qf->queue_id, qf->flow_hash);
245 port->paused_flows[port->paused_flows_len] = *qf;
246 port->paused_flows_len++;
251 dsw_port_remove_paused_flow(struct dsw_port *port,
252 struct dsw_queue_flow *target_qf)
256 for (i = 0; i < port->paused_flows_len; i++) {
257 struct dsw_queue_flow *qf = &port->paused_flows[i];
259 if (qf->queue_id == target_qf->queue_id &&
260 qf->flow_hash == target_qf->flow_hash) {
261 uint16_t last_idx = port->paused_flows_len-1;
263 port->paused_flows[i] =
264 port->paused_flows[last_idx];
265 port->paused_flows_len--;
272 dsw_port_remove_paused_flows(struct dsw_port *port,
273 struct dsw_queue_flow *qfs, uint8_t qfs_len)
277 for (i = 0; i < qfs_len; i++)
278 dsw_port_remove_paused_flow(port, &qfs[i]);
283 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
286 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
287 uint8_t originating_port_id,
288 struct dsw_queue_flow *paused_qfs,
291 struct dsw_ctl_msg cfm = {
293 .originating_port_id = port->id
296 /* There might be already-scheduled events belonging to the
297 * paused flow in the output buffers.
299 dsw_port_flush_out_buffers(dsw, port);
301 dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
303 /* Make sure any stores to the original port's in_ring is seen
304 * before the ctl message.
308 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
311 struct dsw_queue_flow_burst {
312 struct dsw_queue_flow queue_flow;
316 #define DSW_QF_TO_INT(_qf) \
317 ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
320 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
322 const struct dsw_queue_flow *qf_a = v_qf_a;
323 const struct dsw_queue_flow *qf_b = v_qf_b;
325 return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
329 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
330 struct dsw_queue_flow_burst *bursts)
333 struct dsw_queue_flow_burst *current_burst = NULL;
334 uint16_t num_bursts = 0;
336 /* We don't need the stable property, and the list is likely
337 * large enough for qsort() to outperform dsw_stable_sort(),
338 * so we use qsort() here.
340 qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
342 /* arrange the (now-consecutive) events into bursts */
343 for (i = 0; i < qfs_len; i++) {
345 dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
346 current_burst = &bursts[num_bursts];
347 current_burst->queue_flow = qfs[i];
348 current_burst->count = 0;
351 current_burst->count++;
358 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
361 bool below_limit = false;
364 for (i = 0; i < dsw->num_ports; i++) {
365 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
366 if (load < load_limit)
368 port_loads[i] = load;
374 dsw_flow_load(uint16_t num_events, int16_t port_load)
376 return ((int32_t)port_load * (int32_t)num_events) /
377 DSW_MAX_EVENTS_RECORDED;
381 dsw_evaluate_migration(int16_t source_load, int16_t target_load,
384 int32_t res_target_load;
387 if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
390 imbalance = source_load - target_load;
392 if (imbalance < DSW_REBALANCE_THRESHOLD)
395 res_target_load = target_load + flow_load;
397 /* If the estimated load of the target port will be higher
398 * than the source port's load, it doesn't make sense to move
401 if (res_target_load > source_load)
404 /* The more idle the target will be, the better. This will
405 * make migration prefer moving smaller flows, and flows to
406 * lightly loaded ports.
408 return DSW_MAX_LOAD - res_target_load;
412 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
414 struct dsw_queue *queue = &dsw->queues[queue_id];
417 for (i = 0; i < queue->num_serving_ports; i++)
418 if (queue->serving_ports[i] == port_id)
425 dsw_select_emigration_target(struct dsw_evdev *dsw,
426 struct dsw_queue_flow_burst *bursts,
427 uint16_t num_bursts, uint8_t source_port_id,
428 int16_t *port_loads, uint16_t num_ports,
429 uint8_t *target_port_ids,
430 struct dsw_queue_flow *target_qfs,
431 uint8_t *targets_len)
433 int16_t source_port_load = port_loads[source_port_id];
434 struct dsw_queue_flow *candidate_qf;
435 uint8_t candidate_port_id;
436 int16_t candidate_weight = -1;
437 int16_t candidate_flow_load;
440 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
443 for (i = 0; i < num_bursts; i++) {
444 struct dsw_queue_flow_burst *burst = &bursts[i];
445 struct dsw_queue_flow *qf = &burst->queue_flow;
449 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
450 qf->queue_id, qf->flow_hash))
453 flow_load = dsw_flow_load(burst->count, source_port_load);
455 for (port_id = 0; port_id < num_ports; port_id++) {
458 if (port_id == source_port_id)
461 if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
464 weight = dsw_evaluate_migration(source_port_load,
468 if (weight > candidate_weight) {
470 candidate_port_id = port_id;
471 candidate_weight = weight;
472 candidate_flow_load = flow_load;
477 if (candidate_weight < 0)
480 DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
481 "flow_hash %d (with flow load %d) for migration "
482 "to port %d.\n", candidate_qf->queue_id,
483 candidate_qf->flow_hash,
484 DSW_LOAD_TO_PERCENT(candidate_flow_load),
487 port_loads[candidate_port_id] += candidate_flow_load;
488 port_loads[source_port_id] -= candidate_flow_load;
490 target_port_ids[*targets_len] = candidate_port_id;
491 target_qfs[*targets_len] = *candidate_qf;
498 dsw_select_emigration_targets(struct dsw_evdev *dsw,
499 struct dsw_port *source_port,
500 struct dsw_queue_flow_burst *bursts,
501 uint16_t num_bursts, int16_t *port_loads)
503 struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
504 uint8_t *target_port_ids = source_port->emigration_target_port_ids;
505 uint8_t *targets_len = &source_port->emigration_targets_len;
508 for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
511 found = dsw_select_emigration_target(dsw, bursts, num_bursts,
513 port_loads, dsw->num_ports,
521 if (*targets_len == 0)
522 DSW_LOG_DP_PORT(DEBUG, source_port->id,
523 "For the %d flows considered, no target port "
524 "was found.\n", num_bursts);
528 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
530 struct dsw_queue *queue = &dsw->queues[queue_id];
533 if (queue->num_serving_ports > 1)
534 port_id = queue->flow_to_port_map[flow_hash];
536 /* A single-link queue, or atomic/ordered/parallel but
537 * with just a single serving port.
539 port_id = queue->serving_ports[0];
541 DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
542 "to port %d.\n", queue_id, flow_hash, port_id);
548 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
549 uint8_t dest_port_id)
551 struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
552 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
553 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
554 uint16_t enqueued = 0;
556 if (*buffer_len == 0)
559 /* The rings are dimensioned to fit all in-flight events (even
560 * on a single ring), so looping will work.
564 rte_event_ring_enqueue_burst(dest_port->in_ring,
566 *buffer_len-enqueued,
568 } while (unlikely(enqueued != *buffer_len));
574 dsw_port_get_parallel_flow_id(struct dsw_port *port)
576 uint16_t flow_id = port->next_parallel_flow_id;
578 port->next_parallel_flow_id =
579 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
585 dsw_port_buffer_paused(struct dsw_port *port,
586 const struct rte_event *paused_event)
588 port->paused_events[port->paused_events_len] = *paused_event;
589 port->paused_events_len++;
593 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
594 uint8_t dest_port_id, const struct rte_event *event)
596 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
597 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
599 if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
600 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
602 buffer[*buffer_len] = *event;
607 #define DSW_FLOW_ID_BITS (24)
609 dsw_flow_id_hash(uint32_t flow_id)
615 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
616 offset += DSW_MAX_FLOWS_BITS;
617 } while (offset < DSW_FLOW_ID_BITS);
623 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
624 struct rte_event event)
626 uint8_t dest_port_id;
628 event.flow_id = dsw_port_get_parallel_flow_id(source_port);
630 dest_port_id = dsw_schedule(dsw, event.queue_id,
631 dsw_flow_id_hash(event.flow_id));
633 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
637 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
638 const struct rte_event *event)
641 uint8_t dest_port_id;
643 if (unlikely(dsw->queues[event->queue_id].schedule_type ==
644 RTE_SCHED_TYPE_PARALLEL)) {
645 dsw_port_buffer_parallel(dsw, source_port, *event);
649 flow_hash = dsw_flow_id_hash(event->flow_id);
651 if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
653 dsw_port_buffer_paused(source_port, event);
657 dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
659 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
663 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
664 struct dsw_port *source_port,
665 const struct dsw_queue_flow *qf)
667 uint16_t paused_events_len = source_port->paused_events_len;
668 struct rte_event paused_events[paused_events_len];
669 uint8_t dest_port_id;
672 if (paused_events_len == 0)
675 if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
678 rte_memcpy(paused_events, source_port->paused_events,
679 paused_events_len * sizeof(struct rte_event));
681 source_port->paused_events_len = 0;
683 dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
685 for (i = 0; i < paused_events_len; i++) {
686 struct rte_event *event = &paused_events[i];
689 flow_hash = dsw_flow_id_hash(event->flow_id);
691 if (event->queue_id == qf->queue_id &&
692 flow_hash == qf->flow_hash)
693 dsw_port_buffer_non_paused(dsw, source_port,
694 dest_port_id, event);
696 dsw_port_buffer_paused(source_port, event);
701 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
703 uint64_t flow_migration_latency;
705 flow_migration_latency =
706 (rte_get_timer_cycles() - port->emigration_start);
707 port->emigration_latency += (flow_migration_latency * finished);
708 port->emigrations += finished;
712 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
713 uint8_t schedule_type)
716 struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
717 uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
718 uint8_t left_qfs_len = 0;
721 for (i = 0; i < port->emigration_targets_len; i++) {
722 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
723 uint8_t queue_id = qf->queue_id;
724 uint8_t queue_schedule_type =
725 dsw->queues[queue_id].schedule_type;
726 uint16_t flow_hash = qf->flow_hash;
728 if (queue_schedule_type != schedule_type) {
729 left_port_ids[left_qfs_len] =
730 port->emigration_target_port_ids[i];
731 left_qfs[left_qfs_len] = *qf;
736 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
737 "queue_id %d flow_hash %d.\n", queue_id,
740 if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
741 dsw_port_remove_paused_flow(port, qf);
742 dsw_port_flush_paused_events(dsw, port, qf);
746 finished = port->emigration_targets_len - left_qfs_len;
749 dsw_port_emigration_stats(port, finished);
751 for (i = 0; i < left_qfs_len; i++) {
752 port->emigration_target_port_ids[i] = left_port_ids[i];
753 port->emigration_target_qfs[i] = left_qfs[i];
755 port->emigration_targets_len = left_qfs_len;
757 if (port->emigration_targets_len == 0) {
758 port->migration_state = DSW_MIGRATION_STATE_IDLE;
759 port->seen_events_len = 0;
764 dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
765 struct dsw_port *source_port)
769 for (i = 0; i < source_port->emigration_targets_len; i++) {
770 struct dsw_queue_flow *qf =
771 &source_port->emigration_target_qfs[i];
772 uint8_t queue_id = qf->queue_id;
774 if (dsw->queues[queue_id].schedule_type ==
775 RTE_SCHED_TYPE_PARALLEL) {
776 uint8_t dest_port_id =
777 source_port->emigration_target_port_ids[i];
778 uint16_t flow_hash = qf->flow_hash;
780 /* Single byte-sized stores are always atomic. */
781 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
788 dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
792 dsw_port_consider_emigration(struct dsw_evdev *dsw,
793 struct dsw_port *source_port,
796 bool any_port_below_limit;
797 struct dsw_queue_flow *seen_events = source_port->seen_events;
798 uint16_t seen_events_len = source_port->seen_events_len;
799 struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
801 int16_t source_port_load;
802 int16_t port_loads[dsw->num_ports];
804 if (now < source_port->next_emigration)
807 if (dsw->num_ports == 1)
810 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
812 /* Randomize interval to avoid having all threads considering
813 * emigration at the same in point in time, which might lead
814 * to all choosing the same target port.
816 source_port->next_emigration = now +
817 source_port->migration_interval / 2 +
818 rte_rand() % source_port->migration_interval;
820 if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
821 DSW_LOG_DP_PORT(DEBUG, source_port->id,
822 "Emigration already in progress.\n");
826 /* For simplicity, avoid migration in the unlikely case there
827 * is still events to consume in the in_buffer (from the last
830 if (source_port->in_buffer_len > 0) {
831 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
832 "events in the input buffer.\n");
836 source_port_load = rte_atomic16_read(&source_port->load);
837 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
838 DSW_LOG_DP_PORT(DEBUG, source_port->id,
839 "Load %d is below threshold level %d.\n",
840 DSW_LOAD_TO_PERCENT(source_port_load),
841 DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
845 /* Avoid starting any expensive operations (sorting etc), in
846 * case of a scenario with all ports above the load limit.
848 any_port_below_limit =
849 dsw_retrieve_port_loads(dsw, port_loads,
850 DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
851 if (!any_port_below_limit) {
852 DSW_LOG_DP_PORT(DEBUG, source_port->id,
853 "Candidate target ports are all too highly "
858 num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
861 /* For non-big-little systems, there's no point in moving the
864 if (num_bursts < 2) {
865 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
866 "queue_id %d flow_hash %d has been seen.\n",
867 bursts[0].queue_flow.queue_id,
868 bursts[0].queue_flow.flow_hash);
872 dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
875 if (source_port->emigration_targets_len == 0)
878 source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
879 source_port->emigration_start = rte_get_timer_cycles();
881 /* No need to go through the whole pause procedure for
882 * parallel queues, since atomic/ordered semantics need not to
885 dsw_port_move_parallel_flows(dsw, source_port);
887 /* All flows were on PARALLEL queues. */
888 if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
891 /* There might be 'loopback' events already scheduled in the
894 dsw_port_flush_out_buffers(dsw, source_port);
896 dsw_port_add_paused_flows(source_port,
897 source_port->emigration_target_qfs,
898 source_port->emigration_targets_len);
900 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
901 source_port->emigration_target_qfs,
902 source_port->emigration_targets_len);
903 source_port->cfm_cnt = 0;
907 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
908 struct dsw_port *source_port,
909 const struct dsw_queue_flow *qf);
912 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
913 uint8_t originating_port_id,
914 struct dsw_queue_flow *paused_qfs,
918 struct dsw_ctl_msg cfm = {
920 .originating_port_id = port->id
923 dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
927 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
929 for (i = 0; i < qfs_len; i++) {
930 struct dsw_queue_flow *qf = &paused_qfs[i];
932 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
933 port->immigrations++;
935 dsw_port_flush_paused_events(dsw, port, qf);
939 #define FORWARD_BURST_SIZE (32)
942 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
943 struct rte_event_ring *dest_ring,
947 uint16_t events_left;
949 /* Control ring message should been seen before the ring count
950 * is read on the port's in_ring.
954 events_left = rte_event_ring_count(source_port->in_ring);
956 while (events_left > 0) {
957 uint16_t in_burst_size =
958 RTE_MIN(FORWARD_BURST_SIZE, events_left);
959 struct rte_event in_burst[in_burst_size];
963 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
965 in_burst_size, NULL);
966 /* No need to care about bursting forwarded events (to
967 * the destination port's in_ring), since migration
968 * doesn't happen very often, and also the majority of
969 * the dequeued events will likely *not* be forwarded.
971 for (i = 0; i < in_len; i++) {
972 struct rte_event *e = &in_burst[i];
973 if (e->queue_id == queue_id &&
974 dsw_flow_id_hash(e->flow_id) == flow_hash) {
975 while (rte_event_ring_enqueue_burst(dest_ring,
980 uint16_t last_idx = source_port->in_buffer_len;
981 source_port->in_buffer[last_idx] = *e;
982 source_port->in_buffer_len++;
986 events_left -= in_len;
991 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
992 struct dsw_port *source_port)
996 dsw_port_flush_out_buffers(dsw, source_port);
1000 for (i = 0; i < source_port->emigration_targets_len; i++) {
1001 struct dsw_queue_flow *qf =
1002 &source_port->emigration_target_qfs[i];
1003 uint8_t dest_port_id =
1004 source_port->emigration_target_port_ids[i];
1005 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1007 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1010 dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1011 qf->queue_id, qf->flow_hash);
1014 /* Flow table update and migration destination port's enqueues
1015 * must be seen before the control message.
1019 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1020 source_port->emigration_target_qfs,
1021 source_port->emigration_targets_len);
1022 source_port->cfm_cnt = 0;
1023 source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1027 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1031 if (port->cfm_cnt == (dsw->num_ports-1)) {
1032 switch (port->migration_state) {
1033 case DSW_MIGRATION_STATE_PAUSING:
1034 DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1035 "migration state.\n");
1036 port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1038 case DSW_MIGRATION_STATE_UNPAUSING:
1039 dsw_port_end_emigration(dsw, port,
1040 RTE_SCHED_TYPE_ATOMIC);
1050 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1052 struct dsw_ctl_msg msg;
1054 /* So any table loads happens before the ring dequeue, in the
1055 * case of a 'paus' message.
1059 if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1061 case DSW_CTL_PAUS_REQ:
1062 dsw_port_handle_pause_flows(dsw, port,
1063 msg.originating_port_id,
1064 msg.qfs, msg.qfs_len);
1066 case DSW_CTL_UNPAUS_REQ:
1067 dsw_port_handle_unpause_flows(dsw, port,
1068 msg.originating_port_id,
1069 msg.qfs, msg.qfs_len);
1072 dsw_port_handle_confirm(dsw, port);
1079 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1081 /* To pull the control ring reasonbly often on busy ports,
1082 * each dequeued/enqueued event is considered an 'op' too.
1084 port->ops_since_bg_task += (num_events+1);
1088 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1090 if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1091 port->pending_releases == 0))
1092 dsw_port_move_emigrating_flows(dsw, port);
1094 /* Polling the control ring is relatively inexpensive, and
1095 * polling it often helps bringing down migration latency, so
1096 * do this for every iteration.
1098 dsw_port_ctl_process(dsw, port);
1100 /* To avoid considering migration and flushing output buffers
1101 * on every dequeue/enqueue call, the scheduler only performs
1102 * such 'background' tasks every nth
1103 * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1105 if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1108 now = rte_get_timer_cycles();
1110 port->last_bg = now;
1112 /* Logic to avoid having events linger in the output
1115 dsw_port_flush_out_buffers(dsw, port);
1117 dsw_port_consider_load_update(port, now);
1119 dsw_port_consider_emigration(dsw, port, now);
1121 port->ops_since_bg_task = 0;
1126 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1128 uint16_t dest_port_id;
1130 for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1131 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1135 dsw_event_enqueue(void *port, const struct rte_event *ev)
1137 return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1140 static __rte_always_inline uint16_t
1141 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1142 const struct rte_event events[],
1143 uint16_t events_len, bool op_types_known,
1144 uint16_t num_new, uint16_t num_release,
1145 uint16_t num_non_release)
1147 struct dsw_evdev *dsw = source_port->dsw;
1148 bool enough_credits;
1151 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1152 "events to port %d.\n", events_len, source_port->id);
1154 dsw_port_bg_process(dsw, source_port);
1156 /* XXX: For performance (=ring efficiency) reasons, the
1157 * scheduler relies on internal non-ring buffers instead of
1158 * immediately sending the event to the destination ring. For
1159 * a producer that doesn't intend to produce or consume any
1160 * more events, the scheduler provides a way to flush the
1161 * buffer, by means of doing an enqueue of zero events. In
1162 * addition, a port cannot be left "unattended" (e.g. unused)
1163 * for long periods of time, since that would stall
1164 * migration. Eventdev API extensions to provide a cleaner way
1165 * to archieve both of these functions should be
1168 if (unlikely(events_len == 0)) {
1169 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1170 dsw_port_flush_out_buffers(dsw, source_port);
1174 dsw_port_note_op(source_port, events_len);
1176 if (!op_types_known)
1177 for (i = 0; i < events_len; i++) {
1178 switch (events[i].op) {
1179 case RTE_EVENT_OP_RELEASE:
1182 case RTE_EVENT_OP_NEW:
1184 /* Falls through. */
1191 /* Technically, we could allow the non-new events up to the
1192 * first new event in the array into the system, but for
1193 * simplicity reasons, we deny the whole burst if the port is
1194 * above the water mark.
1196 if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1197 source_port->new_event_threshold))
1200 enough_credits = dsw_port_acquire_credits(dsw, source_port,
1202 if (unlikely(!enough_credits))
1205 source_port->pending_releases -= num_release;
1207 dsw_port_enqueue_stats(source_port, num_new,
1208 num_non_release-num_new, num_release);
1210 for (i = 0; i < events_len; i++) {
1211 const struct rte_event *event = &events[i];
1213 if (likely(num_release == 0 ||
1214 event->op != RTE_EVENT_OP_RELEASE))
1215 dsw_port_buffer_event(dsw, source_port, event);
1216 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1219 DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1220 "accepted.\n", num_non_release);
1222 return num_non_release;
1226 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1227 uint16_t events_len)
1229 struct dsw_port *source_port = port;
1231 if (unlikely(events_len > source_port->enqueue_depth))
1232 events_len = source_port->enqueue_depth;
1234 return dsw_event_enqueue_burst_generic(source_port, events,
1235 events_len, false, 0, 0, 0);
1239 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1240 uint16_t events_len)
1242 struct dsw_port *source_port = port;
1244 if (unlikely(events_len > source_port->enqueue_depth))
1245 events_len = source_port->enqueue_depth;
1247 return dsw_event_enqueue_burst_generic(source_port, events,
1248 events_len, true, events_len,
1253 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1254 uint16_t events_len)
1256 struct dsw_port *source_port = port;
1258 if (unlikely(events_len > source_port->enqueue_depth))
1259 events_len = source_port->enqueue_depth;
1261 return dsw_event_enqueue_burst_generic(source_port, events,
1262 events_len, true, 0, 0,
1267 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1269 return dsw_event_dequeue_burst(port, events, 1, wait);
1273 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1278 dsw_port_dequeue_stats(port, num);
1280 for (i = 0; i < num; i++) {
1281 uint16_t l_idx = port->seen_events_idx;
1282 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1283 struct rte_event *event = &events[i];
1284 qf->queue_id = event->queue_id;
1285 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1287 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1289 dsw_port_queue_dequeued_stats(port, event->queue_id);
1292 if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1293 port->seen_events_len =
1294 RTE_MIN(port->seen_events_len + num,
1295 DSW_MAX_EVENTS_RECORDED);
1298 #ifdef DSW_SORT_DEQUEUED
1300 #define DSW_EVENT_TO_INT(_event) \
1301 ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1304 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1306 const struct rte_event *event_a = v_event_a;
1307 const struct rte_event *event_b = v_event_b;
1309 return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1314 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1317 struct dsw_port *source_port = port;
1318 struct dsw_evdev *dsw = source_port->dsw;
1320 dsw_port_ctl_process(dsw, source_port);
1322 if (unlikely(port->in_buffer_len > 0)) {
1323 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1325 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1326 dequeued * sizeof(struct rte_event));
1328 port->in_buffer_start += dequeued;
1329 port->in_buffer_len -= dequeued;
1331 if (port->in_buffer_len == 0)
1332 port->in_buffer_start = 0;
1337 return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1341 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1342 uint64_t wait __rte_unused)
1344 struct dsw_port *source_port = port;
1345 struct dsw_evdev *dsw = source_port->dsw;
1348 source_port->pending_releases = 0;
1350 dsw_port_bg_process(dsw, source_port);
1352 if (unlikely(num > source_port->dequeue_depth))
1353 num = source_port->dequeue_depth;
1355 dequeued = dsw_port_dequeue_burst(source_port, events, num);
1357 source_port->pending_releases = dequeued;
1359 dsw_port_load_record(source_port, dequeued);
1361 dsw_port_note_op(source_port, dequeued);
1364 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1367 dsw_port_return_credits(dsw, source_port, dequeued);
1369 /* One potential optimization one might think of is to
1370 * add a migration state (prior to 'pausing'), and
1371 * only record seen events when the port is in this
1372 * state (and transit to 'pausing' when enough events
1373 * have been gathered). However, that schema doesn't
1374 * seem to improve performance.
1376 dsw_port_record_seen_events(port, events, dequeued);
1377 } else /* Zero-size dequeue means a likely idle port, and thus
1378 * we can afford trading some efficiency for a slightly
1379 * reduced event wall-time latency.
1381 dsw_port_flush_out_buffers(dsw, port);
1383 #ifdef DSW_SORT_DEQUEUED
1384 dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);