1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Ericsson AB
7 #ifdef DSW_SORT_DEQUEUED
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
23 int32_t inflight_credits = port->inflight_credits;
24 int32_t missing_credits = credits - inflight_credits;
25 int32_t total_on_loan;
27 int32_t acquired_credits;
28 int32_t new_total_on_loan;
30 if (likely(missing_credits <= 0)) {
31 port->inflight_credits -= credits;
35 total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36 available = dsw->max_inflight - total_on_loan;
37 acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
39 if (available < acquired_credits)
42 /* This is a race, no locks are involved, and thus some other
43 * thread can allocate tokens in between the check and the
46 new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
49 if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50 /* Some other port took the last credits */
51 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
55 DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
58 port->inflight_credits += acquired_credits;
59 port->inflight_credits -= credits;
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
68 port->inflight_credits += credits;
70 if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72 int32_t return_credits =
73 port->inflight_credits - leave_credits;
75 port->inflight_credits = leave_credits;
77 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
79 DSW_LOG_DP_PORT(DEBUG, port->id,
80 "Returned %d tokens to pool.\n",
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87 uint16_t num_forward, uint16_t num_release)
89 port->new_enqueued += num_new;
90 port->forward_enqueued += num_forward;
91 port->release_enqueued += num_release;
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
97 source_port->queue_enqueued[queue_id]++;
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
103 port->dequeued += num;
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
109 source_port->queue_dequeued[queue_id]++;
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
115 if (dequeued > 0 && port->busy_start == 0)
116 /* work period begins */
117 port->busy_start = rte_get_timer_cycles();
118 else if (dequeued == 0 && port->busy_start > 0) {
119 /* work period ends */
120 uint64_t work_period =
121 rte_get_timer_cycles() - port->busy_start;
122 port->busy_cycles += work_period;
123 port->busy_start = 0;
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
130 uint64_t passed = now - port->measurement_start;
131 uint64_t busy_cycles = port->busy_cycles;
133 if (port->busy_start > 0) {
134 busy_cycles += (now - port->busy_start);
135 port->busy_start = now;
138 int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
140 port->measurement_start = now;
141 port->busy_cycles = 0;
143 port->total_busy_cycles += busy_cycles;
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
155 old_load = rte_atomic16_read(&port->load);
157 period_load = dsw_port_load_close_period(port, now);
159 new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160 (DSW_OLD_LOAD_WEIGHT+1);
162 rte_atomic16_set(&port->load, new_load);
166 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
168 if (now < port->next_load_update)
171 port->next_load_update = now + port->load_update_interval;
173 dsw_port_load_update(port, now);
177 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
179 /* there's always room on the ring */
180 while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
185 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
187 return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
191 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
192 uint8_t type, uint8_t queue_id, uint16_t flow_hash)
195 struct dsw_ctl_msg msg = {
197 .originating_port_id = source_port->id,
198 .queue_id = queue_id,
199 .flow_hash = flow_hash
202 for (port_id = 0; port_id < dsw->num_ports; port_id++)
203 if (port_id != source_port->id)
204 dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
208 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
213 for (i = 0; i < port->paused_flows_len; i++) {
214 struct dsw_queue_flow *qf = &port->paused_flows[i];
215 if (qf->queue_id == queue_id &&
216 qf->flow_hash == flow_hash)
223 dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
224 uint16_t paused_flow_hash)
226 port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
227 .queue_id = queue_id,
228 .flow_hash = paused_flow_hash
230 port->paused_flows_len++;
234 dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
235 uint16_t paused_flow_hash)
239 for (i = 0; i < port->paused_flows_len; i++) {
240 struct dsw_queue_flow *qf = &port->paused_flows[i];
242 if (qf->queue_id == queue_id &&
243 qf->flow_hash == paused_flow_hash) {
244 uint16_t last_idx = port->paused_flows_len-1;
246 port->paused_flows[i] =
247 port->paused_flows[last_idx];
248 port->paused_flows_len--;
255 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
258 dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
259 uint8_t originating_port_id, uint8_t queue_id,
260 uint16_t paused_flow_hash)
262 struct dsw_ctl_msg cfm = {
264 .originating_port_id = port->id,
265 .queue_id = queue_id,
266 .flow_hash = paused_flow_hash
269 DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
270 queue_id, paused_flow_hash);
272 /* There might be already-scheduled events belonging to the
273 * paused flow in the output buffers.
275 dsw_port_flush_out_buffers(dsw, port);
277 dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
279 /* Make sure any stores to the original port's in_ring is seen
280 * before the ctl message.
284 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
288 dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
289 uint8_t exclude_port_id, int16_t *port_loads,
290 uint8_t *target_port_id, int16_t *target_load)
292 int16_t candidate_port_id = -1;
293 int16_t candidate_load = DSW_MAX_LOAD;
296 for (i = 0; i < num_port_ids; i++) {
297 uint8_t port_id = port_ids[i];
298 if (port_id != exclude_port_id) {
299 int16_t load = port_loads[port_id];
300 if (candidate_port_id == -1 ||
301 load < candidate_load) {
302 candidate_port_id = port_id;
303 candidate_load = load;
307 *target_port_id = candidate_port_id;
308 *target_load = candidate_load;
311 struct dsw_queue_flow_burst {
312 struct dsw_queue_flow queue_flow;
317 dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
319 const struct dsw_queue_flow_burst *burst_a = v_burst_a;
320 const struct dsw_queue_flow_burst *burst_b = v_burst_b;
322 int a_count = burst_a->count;
323 int b_count = burst_b->count;
325 return a_count - b_count;
328 #define DSW_QF_TO_INT(_qf) \
329 ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
332 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
334 const struct dsw_queue_flow *qf_a = v_qf_a;
335 const struct dsw_queue_flow *qf_b = v_qf_b;
337 return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
341 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
342 struct dsw_queue_flow_burst *bursts)
345 struct dsw_queue_flow_burst *current_burst = NULL;
346 uint16_t num_bursts = 0;
348 /* We don't need the stable property, and the list is likely
349 * large enough for qsort() to outperform dsw_stable_sort(),
350 * so we use qsort() here.
352 qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
354 /* arrange the (now-consecutive) events into bursts */
355 for (i = 0; i < qfs_len; i++) {
357 dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
358 current_burst = &bursts[num_bursts];
359 current_burst->queue_flow = qfs[i];
360 current_burst->count = 0;
363 current_burst->count++;
366 qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
372 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
375 bool below_limit = false;
378 for (i = 0; i < dsw->num_ports; i++) {
379 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
380 if (load < load_limit)
382 port_loads[i] = load;
388 dsw_select_migration_target(struct dsw_evdev *dsw,
389 struct dsw_port *source_port,
390 struct dsw_queue_flow_burst *bursts,
391 uint16_t num_bursts, int16_t *port_loads,
392 int16_t max_load, struct dsw_queue_flow *target_qf,
393 uint8_t *target_port_id)
395 uint16_t source_load = port_loads[source_port->id];
398 for (i = 0; i < num_bursts; i++) {
399 struct dsw_queue_flow *qf = &bursts[i].queue_flow;
401 if (dsw_port_is_flow_paused(source_port, qf->queue_id,
405 struct dsw_queue *queue = &dsw->queues[qf->queue_id];
408 dsw_find_lowest_load_port(queue->serving_ports,
409 queue->num_serving_ports,
410 source_port->id, port_loads,
411 target_port_id, &target_load);
413 if (target_load < source_load &&
414 target_load < max_load) {
420 DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
421 "no target port found with load less than %d.\n",
422 num_bursts, DSW_LOAD_TO_PERCENT(max_load));
428 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
430 struct dsw_queue *queue = &dsw->queues[queue_id];
433 if (queue->num_serving_ports > 1)
434 port_id = queue->flow_to_port_map[flow_hash];
436 /* A single-link queue, or atomic/ordered/parallel but
437 * with just a single serving port.
439 port_id = queue->serving_ports[0];
441 DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
442 "to port %d.\n", queue_id, flow_hash, port_id);
448 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
449 uint8_t dest_port_id)
451 struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
452 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
453 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
454 uint16_t enqueued = 0;
456 if (*buffer_len == 0)
459 /* The rings are dimensioned to fit all in-flight events (even
460 * on a single ring), so looping will work.
464 rte_event_ring_enqueue_burst(dest_port->in_ring,
466 *buffer_len-enqueued,
468 } while (unlikely(enqueued != *buffer_len));
474 dsw_port_get_parallel_flow_id(struct dsw_port *port)
476 uint16_t flow_id = port->next_parallel_flow_id;
478 port->next_parallel_flow_id =
479 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
485 dsw_port_buffer_paused(struct dsw_port *port,
486 const struct rte_event *paused_event)
488 port->paused_events[port->paused_events_len] = *paused_event;
489 port->paused_events_len++;
493 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
494 uint8_t dest_port_id, const struct rte_event *event)
496 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
497 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
499 if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
500 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
502 buffer[*buffer_len] = *event;
507 #define DSW_FLOW_ID_BITS (24)
509 dsw_flow_id_hash(uint32_t flow_id)
515 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
516 offset += DSW_MAX_FLOWS_BITS;
517 } while (offset < DSW_FLOW_ID_BITS);
523 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
524 struct rte_event event)
526 uint8_t dest_port_id;
528 event.flow_id = dsw_port_get_parallel_flow_id(source_port);
530 dest_port_id = dsw_schedule(dsw, event.queue_id,
531 dsw_flow_id_hash(event.flow_id));
533 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
537 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
538 const struct rte_event *event)
541 uint8_t dest_port_id;
543 if (unlikely(dsw->queues[event->queue_id].schedule_type ==
544 RTE_SCHED_TYPE_PARALLEL)) {
545 dsw_port_buffer_parallel(dsw, source_port, *event);
549 flow_hash = dsw_flow_id_hash(event->flow_id);
551 if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
553 dsw_port_buffer_paused(source_port, event);
557 dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
559 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
563 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
564 struct dsw_port *source_port,
565 uint8_t queue_id, uint16_t paused_flow_hash)
567 uint16_t paused_events_len = source_port->paused_events_len;
568 struct rte_event paused_events[paused_events_len];
569 uint8_t dest_port_id;
572 if (paused_events_len == 0)
575 if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
578 rte_memcpy(paused_events, source_port->paused_events,
579 paused_events_len * sizeof(struct rte_event));
581 source_port->paused_events_len = 0;
583 dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
585 for (i = 0; i < paused_events_len; i++) {
586 struct rte_event *event = &paused_events[i];
589 flow_hash = dsw_flow_id_hash(event->flow_id);
591 if (event->queue_id == queue_id &&
592 flow_hash == paused_flow_hash)
593 dsw_port_buffer_non_paused(dsw, source_port,
594 dest_port_id, event);
596 dsw_port_buffer_paused(source_port, event);
601 dsw_port_migration_stats(struct dsw_port *port)
603 uint64_t migration_latency;
605 migration_latency = (rte_get_timer_cycles() - port->migration_start);
606 port->migration_latency += migration_latency;
611 dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
613 uint8_t queue_id = port->migration_target_qf.queue_id;
614 uint16_t flow_hash = port->migration_target_qf.flow_hash;
616 port->migration_state = DSW_MIGRATION_STATE_IDLE;
617 port->seen_events_len = 0;
619 dsw_port_migration_stats(port);
621 if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
622 dsw_port_remove_paused_flow(port, queue_id, flow_hash);
623 dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
626 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
627 "%d flow_hash %d.\n", queue_id, flow_hash);
631 dsw_port_consider_migration(struct dsw_evdev *dsw,
632 struct dsw_port *source_port,
635 bool any_port_below_limit;
636 struct dsw_queue_flow *seen_events = source_port->seen_events;
637 uint16_t seen_events_len = source_port->seen_events_len;
638 struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
640 int16_t source_port_load;
641 int16_t port_loads[dsw->num_ports];
643 if (now < source_port->next_migration)
646 if (dsw->num_ports == 1)
649 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
651 /* Randomize interval to avoid having all threads considering
652 * migration at the same in point in time, which might lead to
653 * all choosing the same target port.
655 source_port->next_migration = now +
656 source_port->migration_interval / 2 +
657 rte_rand() % source_port->migration_interval;
659 if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
660 DSW_LOG_DP_PORT(DEBUG, source_port->id,
661 "Migration already in progress.\n");
665 /* For simplicity, avoid migration in the unlikely case there
666 * is still events to consume in the in_buffer (from the last
669 if (source_port->in_buffer_len > 0) {
670 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
671 "events in the input buffer.\n");
675 source_port_load = rte_atomic16_read(&source_port->load);
676 if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
677 DSW_LOG_DP_PORT(DEBUG, source_port->id,
678 "Load %d is below threshold level %d.\n",
679 DSW_LOAD_TO_PERCENT(source_port_load),
680 DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
684 /* Avoid starting any expensive operations (sorting etc), in
685 * case of a scenario with all ports above the load limit.
687 any_port_below_limit =
688 dsw_retrieve_port_loads(dsw, port_loads,
689 DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
690 if (!any_port_below_limit) {
691 DSW_LOG_DP_PORT(DEBUG, source_port->id,
692 "Candidate target ports are all too highly "
697 /* Sort flows into 'bursts' to allow attempting to migrating
698 * small (but still active) flows first - this it to avoid
699 * having large flows moving around the worker cores too much
700 * (to avoid cache misses, among other things). Of course, the
701 * number of recorded events (queue+flow ids) are limited, and
702 * provides only a snapshot, so only so many conclusions can
703 * be drawn from this data.
705 num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
707 /* For non-big-little systems, there's no point in moving the
710 if (num_bursts < 2) {
711 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
712 "queue_id %d flow_hash %d has been seen.\n",
713 bursts[0].queue_flow.queue_id,
714 bursts[0].queue_flow.flow_hash);
718 /* The strategy is to first try to find a flow to move to a
719 * port with low load (below the migration-attempt
720 * threshold). If that fails, we try to find a port which is
721 * below the max threshold, and also less loaded than this
724 if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
726 DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
727 &source_port->migration_target_qf,
728 &source_port->migration_target_port_id)
730 !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
732 DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
733 &source_port->migration_target_qf,
734 &source_port->migration_target_port_id))
737 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
738 "flow_hash %d from port %d to port %d.\n",
739 source_port->migration_target_qf.queue_id,
740 source_port->migration_target_qf.flow_hash,
741 source_port->id, source_port->migration_target_port_id);
743 /* We have a winner. */
745 source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
746 source_port->migration_start = rte_get_timer_cycles();
748 /* No need to go through the whole pause procedure for
749 * parallel queues, since atomic/ordered semantics need not to
753 if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
754 == RTE_SCHED_TYPE_PARALLEL) {
755 uint8_t queue_id = source_port->migration_target_qf.queue_id;
756 uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
757 uint8_t dest_port_id = source_port->migration_target_port_id;
759 /* Single byte-sized stores are always atomic. */
760 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
764 dsw_port_end_migration(dsw, source_port);
769 /* There might be 'loopback' events already scheduled in the
772 dsw_port_flush_out_buffers(dsw, source_port);
774 dsw_port_add_paused_flow(source_port,
775 source_port->migration_target_qf.queue_id,
776 source_port->migration_target_qf.flow_hash);
778 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
779 source_port->migration_target_qf.queue_id,
780 source_port->migration_target_qf.flow_hash);
781 source_port->cfm_cnt = 0;
785 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
786 struct dsw_port *source_port,
787 uint8_t queue_id, uint16_t paused_flow_hash);
790 dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
791 uint8_t originating_port_id, uint8_t queue_id,
792 uint16_t paused_flow_hash)
794 struct dsw_ctl_msg cfm = {
796 .originating_port_id = port->id,
797 .queue_id = queue_id,
798 .flow_hash = paused_flow_hash
801 DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
802 queue_id, paused_flow_hash);
804 dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
808 dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
810 dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
813 #define FORWARD_BURST_SIZE (32)
816 dsw_port_forward_migrated_flow(struct dsw_port *source_port,
817 struct rte_event_ring *dest_ring,
821 uint16_t events_left;
823 /* Control ring message should been seen before the ring count
824 * is read on the port's in_ring.
828 events_left = rte_event_ring_count(source_port->in_ring);
830 while (events_left > 0) {
831 uint16_t in_burst_size =
832 RTE_MIN(FORWARD_BURST_SIZE, events_left);
833 struct rte_event in_burst[in_burst_size];
837 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
839 in_burst_size, NULL);
840 /* No need to care about bursting forwarded events (to
841 * the destination port's in_ring), since migration
842 * doesn't happen very often, and also the majority of
843 * the dequeued events will likely *not* be forwarded.
845 for (i = 0; i < in_len; i++) {
846 struct rte_event *e = &in_burst[i];
847 if (e->queue_id == queue_id &&
848 dsw_flow_id_hash(e->flow_id) == flow_hash) {
849 while (rte_event_ring_enqueue_burst(dest_ring,
854 uint16_t last_idx = source_port->in_buffer_len;
855 source_port->in_buffer[last_idx] = *e;
856 source_port->in_buffer_len++;
860 events_left -= in_len;
865 dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
866 struct dsw_port *source_port)
868 uint8_t queue_id = source_port->migration_target_qf.queue_id;
869 uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
870 uint8_t dest_port_id = source_port->migration_target_port_id;
871 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
873 dsw_port_flush_out_buffers(dsw, source_port);
877 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
880 dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
881 queue_id, flow_hash);
883 /* Flow table update and migration destination port's enqueues
884 * must be seen before the control message.
888 dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
890 source_port->cfm_cnt = 0;
891 source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
895 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
899 if (port->cfm_cnt == (dsw->num_ports-1)) {
900 switch (port->migration_state) {
901 case DSW_MIGRATION_STATE_PAUSING:
902 DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
903 "migration state.\n");
904 port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
906 case DSW_MIGRATION_STATE_UNPAUSING:
907 dsw_port_end_migration(dsw, port);
917 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
919 struct dsw_ctl_msg msg;
921 /* So any table loads happens before the ring dequeue, in the
922 * case of a 'paus' message.
926 if (dsw_port_ctl_dequeue(port, &msg) == 0) {
928 case DSW_CTL_PAUS_REQ:
929 dsw_port_handle_pause_flow(dsw, port,
930 msg.originating_port_id,
931 msg.queue_id, msg.flow_hash);
933 case DSW_CTL_UNPAUS_REQ:
934 dsw_port_handle_unpause_flow(dsw, port,
935 msg.originating_port_id,
940 dsw_port_handle_confirm(dsw, port);
947 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
949 /* To pull the control ring reasonbly often on busy ports,
950 * each dequeued/enqueued event is considered an 'op' too.
952 port->ops_since_bg_task += (num_events+1);
956 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
958 if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
959 port->pending_releases == 0))
960 dsw_port_move_migrating_flow(dsw, port);
962 /* Polling the control ring is relatively inexpensive, and
963 * polling it often helps bringing down migration latency, so
964 * do this for every iteration.
966 dsw_port_ctl_process(dsw, port);
968 /* To avoid considering migration and flushing output buffers
969 * on every dequeue/enqueue call, the scheduler only performs
970 * such 'background' tasks every nth
971 * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
973 if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
976 now = rte_get_timer_cycles();
980 /* Logic to avoid having events linger in the output
983 dsw_port_flush_out_buffers(dsw, port);
985 dsw_port_consider_load_update(port, now);
987 dsw_port_consider_migration(dsw, port, now);
989 port->ops_since_bg_task = 0;
994 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
996 uint16_t dest_port_id;
998 for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
999 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1003 dsw_event_enqueue(void *port, const struct rte_event *ev)
1005 return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1008 static __rte_always_inline uint16_t
1009 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1010 const struct rte_event events[],
1011 uint16_t events_len, bool op_types_known,
1012 uint16_t num_new, uint16_t num_release,
1013 uint16_t num_non_release)
1015 struct dsw_evdev *dsw = source_port->dsw;
1016 bool enough_credits;
1019 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1020 "events to port %d.\n", events_len, source_port->id);
1022 dsw_port_bg_process(dsw, source_port);
1024 /* XXX: For performance (=ring efficiency) reasons, the
1025 * scheduler relies on internal non-ring buffers instead of
1026 * immediately sending the event to the destination ring. For
1027 * a producer that doesn't intend to produce or consume any
1028 * more events, the scheduler provides a way to flush the
1029 * buffer, by means of doing an enqueue of zero events. In
1030 * addition, a port cannot be left "unattended" (e.g. unused)
1031 * for long periods of time, since that would stall
1032 * migration. Eventdev API extensions to provide a cleaner way
1033 * to archieve both of these functions should be
1036 if (unlikely(events_len == 0)) {
1037 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1038 dsw_port_flush_out_buffers(dsw, source_port);
1042 dsw_port_note_op(source_port, events_len);
1044 if (!op_types_known)
1045 for (i = 0; i < events_len; i++) {
1046 switch (events[i].op) {
1047 case RTE_EVENT_OP_RELEASE:
1050 case RTE_EVENT_OP_NEW:
1052 /* Falls through. */
1059 /* Technically, we could allow the non-new events up to the
1060 * first new event in the array into the system, but for
1061 * simplicity reasons, we deny the whole burst if the port is
1062 * above the water mark.
1064 if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1065 source_port->new_event_threshold))
1068 enough_credits = dsw_port_acquire_credits(dsw, source_port,
1070 if (unlikely(!enough_credits))
1073 source_port->pending_releases -= num_release;
1075 dsw_port_enqueue_stats(source_port, num_new,
1076 num_non_release-num_new, num_release);
1078 for (i = 0; i < events_len; i++) {
1079 const struct rte_event *event = &events[i];
1081 if (likely(num_release == 0 ||
1082 event->op != RTE_EVENT_OP_RELEASE))
1083 dsw_port_buffer_event(dsw, source_port, event);
1084 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1087 DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1088 "accepted.\n", num_non_release);
1090 return num_non_release;
1094 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1095 uint16_t events_len)
1097 struct dsw_port *source_port = port;
1099 if (unlikely(events_len > source_port->enqueue_depth))
1100 events_len = source_port->enqueue_depth;
1102 return dsw_event_enqueue_burst_generic(source_port, events,
1103 events_len, false, 0, 0, 0);
1107 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1108 uint16_t events_len)
1110 struct dsw_port *source_port = port;
1112 if (unlikely(events_len > source_port->enqueue_depth))
1113 events_len = source_port->enqueue_depth;
1115 return dsw_event_enqueue_burst_generic(source_port, events,
1116 events_len, true, events_len,
1121 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1122 uint16_t events_len)
1124 struct dsw_port *source_port = port;
1126 if (unlikely(events_len > source_port->enqueue_depth))
1127 events_len = source_port->enqueue_depth;
1129 return dsw_event_enqueue_burst_generic(source_port, events,
1130 events_len, true, 0, 0,
1135 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1137 return dsw_event_dequeue_burst(port, events, 1, wait);
1141 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1146 dsw_port_dequeue_stats(port, num);
1148 for (i = 0; i < num; i++) {
1149 uint16_t l_idx = port->seen_events_idx;
1150 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1151 struct rte_event *event = &events[i];
1152 qf->queue_id = event->queue_id;
1153 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1155 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1157 dsw_port_queue_dequeued_stats(port, event->queue_id);
1160 if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1161 port->seen_events_len =
1162 RTE_MIN(port->seen_events_len + num,
1163 DSW_MAX_EVENTS_RECORDED);
1166 #ifdef DSW_SORT_DEQUEUED
1168 #define DSW_EVENT_TO_INT(_event) \
1169 ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1172 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1174 const struct rte_event *event_a = v_event_a;
1175 const struct rte_event *event_b = v_event_b;
1177 return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1182 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1185 struct dsw_port *source_port = port;
1186 struct dsw_evdev *dsw = source_port->dsw;
1188 dsw_port_ctl_process(dsw, source_port);
1190 if (unlikely(port->in_buffer_len > 0)) {
1191 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1193 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1194 dequeued * sizeof(struct rte_event));
1196 port->in_buffer_start += dequeued;
1197 port->in_buffer_len -= dequeued;
1199 if (port->in_buffer_len == 0)
1200 port->in_buffer_start = 0;
1205 return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1209 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1210 uint64_t wait __rte_unused)
1212 struct dsw_port *source_port = port;
1213 struct dsw_evdev *dsw = source_port->dsw;
1216 source_port->pending_releases = 0;
1218 dsw_port_bg_process(dsw, source_port);
1220 if (unlikely(num > source_port->dequeue_depth))
1221 num = source_port->dequeue_depth;
1223 dequeued = dsw_port_dequeue_burst(source_port, events, num);
1225 source_port->pending_releases = dequeued;
1227 dsw_port_load_record(source_port, dequeued);
1229 dsw_port_note_op(source_port, dequeued);
1232 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1235 dsw_port_return_credits(dsw, source_port, dequeued);
1237 /* One potential optimization one might think of is to
1238 * add a migration state (prior to 'pausing'), and
1239 * only record seen events when the port is in this
1240 * state (and transit to 'pausing' when enough events
1241 * have been gathered). However, that schema doesn't
1242 * seem to improve performance.
1244 dsw_port_record_seen_events(port, events, dequeued);
1246 /* XXX: Assuming the port can't produce any more work,
1247 * consider flushing the output buffer, on dequeued ==
1251 #ifdef DSW_SORT_DEQUEUED
1252 dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);