net/dpaa2: support UDP destination port based muxing
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #ifdef RTE_LIBRTE_I40E_PMD
32 #include <rte_pmd_i40e.h>
33 #endif
34 #include <rte_power.h>
35
36 #include <libvirt/libvirt.h>
37 #include "channel_monitor.h"
38 #include "channel_commands.h"
39 #include "channel_manager.h"
40 #include "power_manager.h"
41 #include "oob_monitor.h"
42
43 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
44
45 #define MAX_EVENTS 256
46
47 uint64_t vsi_pkt_count_prev[384];
48 uint64_t rdtsc_prev[384];
49 #define MAX_JSON_STRING_LEN 1024
50 char json_data[MAX_JSON_STRING_LEN];
51
52 double time_period_ms = 1;
53 static volatile unsigned run_loop = 1;
54 static int global_event_fd;
55 static unsigned int policy_is_set;
56 static struct epoll_event *global_events_list;
57 static struct policy policies[RTE_MAX_LCORE];
58
59 #ifdef USE_JANSSON
60
61 union PFID {
62         struct rte_ether_addr addr;
63         uint64_t pfid;
64 };
65
66 static int
67 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
68 {
69         int i;
70         char *end;
71         unsigned long o[RTE_ETHER_ADDR_LEN];
72
73         i = 0;
74         do {
75                 errno = 0;
76                 o[i] = strtoul(a, &end, 16);
77                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
78                         return -1;
79                 a = end + 1;
80         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
81
82         /* Junk at the end of line */
83         if (end[0] != 0)
84                 return -1;
85
86         /* Support the format XX:XX:XX:XX:XX:XX */
87         if (i == RTE_ETHER_ADDR_LEN) {
88                 while (i-- != 0) {
89                         if (o[i] > UINT8_MAX)
90                                 return -1;
91                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
92                 }
93         /* Support the format XXXX:XXXX:XXXX */
94         } else if (i == RTE_ETHER_ADDR_LEN / 2) {
95                 while (i-- != 0) {
96                         if (o[i] > UINT16_MAX)
97                                 return -1;
98                         ether_addr->addr_bytes[i * 2] =
99                                         (uint8_t)(o[i] >> 8);
100                         ether_addr->addr_bytes[i * 2 + 1] =
101                                         (uint8_t)(o[i] & 0xff);
102                 }
103         /* unknown format */
104         } else
105                 return -1;
106
107         return 0;
108 }
109
110 static int
111 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
112 {
113         union PFID pfid;
114         int ret;
115
116         /* Use port MAC address as the vfid */
117         ret = str_to_ether_addr(mac, &pfid.addr);
118
119         if (ret != 0) {
120                 RTE_LOG(ERR, CHANNEL_MONITOR,
121                         "Invalid mac address received in JSON\n");
122                 pkt->vfid[idx] = 0;
123                 return -1;
124         }
125
126         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
127                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
128                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
129                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
130                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
131
132         pkt->vfid[idx] = pfid.pfid;
133         return 0;
134 }
135
136 static char*
137 get_resource_name_from_chn_path(const char *channel_path)
138 {
139         char *substr = NULL;
140
141         substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
142
143         return substr;
144 }
145
146 static int
147 get_resource_id_from_vmname(const char *vm_name)
148 {
149         int result = -1;
150         int off = 0;
151
152         if (vm_name == NULL)
153                 return -1;
154
155         while (vm_name[off] != '\0') {
156                 if (isdigit(vm_name[off]))
157                         break;
158                 off++;
159         }
160         result = atoi(&vm_name[off]);
161         if ((result == 0) && (vm_name[off] != '0'))
162                 return -1;
163
164         return result;
165 }
166
167 static int
168 parse_json_to_pkt(json_t *element, struct channel_packet *pkt,
169                                         const char *vm_name)
170 {
171         const char *key;
172         json_t *value;
173         int ret;
174         int resource_id;
175
176         memset(pkt, 0, sizeof(struct channel_packet));
177
178         pkt->nb_mac_to_monitor = 0;
179         pkt->t_boost_status.tbEnabled = false;
180         pkt->workload = LOW;
181         pkt->policy_to_use = TIME;
182         pkt->command = PKT_POLICY;
183         pkt->core_type = CORE_TYPE_PHYSICAL;
184
185         if (vm_name == NULL) {
186                 RTE_LOG(ERR, CHANNEL_MONITOR,
187                         "vm_name is NULL, request rejected !\n");
188                 return -1;
189         }
190
191         json_object_foreach(element, key, value) {
192                 if (!strcmp(key, "policy")) {
193                         /* Recurse in to get the contents of profile */
194                         ret = parse_json_to_pkt(value, pkt, vm_name);
195                         if (ret)
196                                 return ret;
197                 } else if (!strcmp(key, "instruction")) {
198                         /* Recurse in to get the contents of instruction */
199                         ret = parse_json_to_pkt(value, pkt, vm_name);
200                         if (ret)
201                                 return ret;
202                 } else if (!strcmp(key, "command")) {
203                         char command[32];
204                         strlcpy(command, json_string_value(value), 32);
205                         if (!strcmp(command, "power")) {
206                                 pkt->command = CPU_POWER;
207                         } else if (!strcmp(command, "create")) {
208                                 pkt->command = PKT_POLICY;
209                         } else if (!strcmp(command, "destroy")) {
210                                 pkt->command = PKT_POLICY_REMOVE;
211                         } else {
212                                 RTE_LOG(ERR, CHANNEL_MONITOR,
213                                         "Invalid command received in JSON\n");
214                                 return -1;
215                         }
216                 } else if (!strcmp(key, "policy_type")) {
217                         char command[32];
218                         strlcpy(command, json_string_value(value), 32);
219                         if (!strcmp(command, "TIME")) {
220                                 pkt->policy_to_use = TIME;
221                         } else if (!strcmp(command, "TRAFFIC")) {
222                                 pkt->policy_to_use = TRAFFIC;
223                         } else if (!strcmp(command, "WORKLOAD")) {
224                                 pkt->policy_to_use = WORKLOAD;
225                         } else if (!strcmp(command, "BRANCH_RATIO")) {
226                                 pkt->policy_to_use = BRANCH_RATIO;
227                         } else {
228                                 RTE_LOG(ERR, CHANNEL_MONITOR,
229                                         "Wrong policy_type received in JSON\n");
230                                 return -1;
231                         }
232                 } else if (!strcmp(key, "workload")) {
233                         char command[32];
234                         strlcpy(command, json_string_value(value), 32);
235                         if (!strcmp(command, "HIGH")) {
236                                 pkt->workload = HIGH;
237                         } else if (!strcmp(command, "MEDIUM")) {
238                                 pkt->workload = MEDIUM;
239                         } else if (!strcmp(command, "LOW")) {
240                                 pkt->workload = LOW;
241                         } else {
242                                 RTE_LOG(ERR, CHANNEL_MONITOR,
243                                         "Wrong workload received in JSON\n");
244                                 return -1;
245                         }
246                 } else if (!strcmp(key, "busy_hours")) {
247                         unsigned int i;
248                         size_t size = json_array_size(value);
249
250                         for (i = 0; i < size; i++) {
251                                 int hour = (int)json_integer_value(
252                                                 json_array_get(value, i));
253                                 pkt->timer_policy.busy_hours[i] = hour;
254                         }
255                 } else if (!strcmp(key, "quiet_hours")) {
256                         unsigned int i;
257                         size_t size = json_array_size(value);
258
259                         for (i = 0; i < size; i++) {
260                                 int hour = (int)json_integer_value(
261                                                 json_array_get(value, i));
262                                 pkt->timer_policy.quiet_hours[i] = hour;
263                         }
264                 } else if (!strcmp(key, "mac_list")) {
265                         unsigned int i;
266                         size_t size = json_array_size(value);
267
268                         for (i = 0; i < size; i++) {
269                                 char mac[32];
270                                 strlcpy(mac,
271                                         json_string_value(json_array_get(value, i)),
272                                         32);
273                                 set_policy_mac(pkt, i, mac);
274                         }
275                         pkt->nb_mac_to_monitor = size;
276                 } else if (!strcmp(key, "avg_packet_thresh")) {
277                         pkt->traffic_policy.avg_max_packet_thresh =
278                                         (uint32_t)json_integer_value(value);
279                 } else if (!strcmp(key, "max_packet_thresh")) {
280                         pkt->traffic_policy.max_max_packet_thresh =
281                                         (uint32_t)json_integer_value(value);
282                 } else if (!strcmp(key, "unit")) {
283                         char unit[32];
284                         strlcpy(unit, json_string_value(value), 32);
285                         if (!strcmp(unit, "SCALE_UP")) {
286                                 pkt->unit = CPU_POWER_SCALE_UP;
287                         } else if (!strcmp(unit, "SCALE_DOWN")) {
288                                 pkt->unit = CPU_POWER_SCALE_DOWN;
289                         } else if (!strcmp(unit, "SCALE_MAX")) {
290                                 pkt->unit = CPU_POWER_SCALE_MAX;
291                         } else if (!strcmp(unit, "SCALE_MIN")) {
292                                 pkt->unit = CPU_POWER_SCALE_MIN;
293                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
294                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
295                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
296                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
297                         } else {
298                                 RTE_LOG(ERR, CHANNEL_MONITOR,
299                                         "Invalid command received in JSON\n");
300                                 return -1;
301                         }
302                 } else {
303                         RTE_LOG(ERR, CHANNEL_MONITOR,
304                                 "Unknown key received in JSON string: %s\n",
305                                 key);
306                 }
307
308                 resource_id = get_resource_id_from_vmname(vm_name);
309                 if (resource_id < 0) {
310                         RTE_LOG(ERR, CHANNEL_MONITOR,
311                                 "Could not get resource_id from vm_name:%s\n",
312                                 vm_name);
313                         return -1;
314                 }
315                 strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ);
316                 pkt->resource_id = resource_id;
317         }
318         return 0;
319 }
320 #endif
321
322 void channel_monitor_exit(void)
323 {
324         run_loop = 0;
325         rte_free(global_events_list);
326 }
327
328 static void
329 core_share(int pNo, int z, int x, int t)
330 {
331         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
332                 if (strcmp(policies[pNo].pkt.vm_name,
333                                 lvm_info[x].vm_name) != 0) {
334                         policies[pNo].core_share[z].status = 1;
335                         power_manager_scale_core_max(
336                                         policies[pNo].core_share[z].pcpu);
337                 }
338         }
339 }
340
341 static void
342 core_share_status(int pNo)
343 {
344
345         int noVms = 0, noVcpus = 0, z, x, t;
346
347         get_all_vm(&noVms, &noVcpus);
348
349         /* Reset Core Share Status. */
350         for (z = 0; z < noVcpus; z++)
351                 policies[pNo].core_share[z].status = 0;
352
353         /* Foreach vcpu in a policy. */
354         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
355                 /* Foreach VM on the platform. */
356                 for (x = 0; x < noVms; x++) {
357                         /* Foreach vcpu of VMs on platform. */
358                         for (t = 0; t < lvm_info[x].num_cpus; t++)
359                                 core_share(pNo, z, x, t);
360                 }
361         }
362 }
363
364
365 static int
366 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
367 {
368         int ret = 0;
369
370         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
371                 ci->cd[pcpu].oob_enabled = 1;
372                 ret = add_core_to_monitor(pcpu);
373                 if (ret == 0)
374                         RTE_LOG(INFO, CHANNEL_MONITOR,
375                                         "Monitoring pcpu %d OOB for %s\n",
376                                         pcpu, pol->pkt.vm_name);
377                 else
378                         RTE_LOG(ERR, CHANNEL_MONITOR,
379                                         "Error monitoring pcpu %d OOB for %s\n",
380                                         pcpu, pol->pkt.vm_name);
381
382         } else {
383                 pol->core_share[count].pcpu = pcpu;
384                 RTE_LOG(INFO, CHANNEL_MONITOR,
385                                 "Monitoring pcpu %d for %s\n",
386                                 pcpu, pol->pkt.vm_name);
387         }
388         return ret;
389 }
390
391 static void
392 get_pcpu_to_control(struct policy *pol)
393 {
394
395         /* Convert vcpu to pcpu. */
396         struct vm_info info;
397         int pcpu, count;
398         struct core_info *ci;
399
400         ci = get_core_info();
401
402         RTE_LOG(DEBUG, CHANNEL_MONITOR,
403                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
404
405         /*
406          * So now that we're handling virtual and physical cores, we need to
407          * differenciate between them when adding them to the branch monitor.
408          * Virtual cores need to be converted to physical cores.
409          */
410         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
411                 /*
412                  * If the cores in the policy are virtual, we need to map them
413                  * to physical core. We look up the vm info and use that for
414                  * the mapping.
415                  */
416                 get_info_vm(pol->pkt.vm_name, &info);
417                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
418                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
419                         pcpu_monitor(pol, ci, pcpu, count);
420                 }
421         } else {
422                 /*
423                  * If the cores in the policy are physical, we just use
424                  * those core id's directly.
425                  */
426                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
427                         pcpu = pol->pkt.vcpu_to_control[count];
428                         pcpu_monitor(pol, ci, pcpu, count);
429                 }
430         }
431 }
432
433 static int
434 get_pfid(struct policy *pol)
435 {
436
437         int i, x, ret = 0;
438
439         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
440
441                 RTE_ETH_FOREACH_DEV(x) {
442 #ifdef RTE_LIBRTE_I40E_PMD
443                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
444                                 (struct rte_ether_addr *)&(pol->pkt.vfid[i]));
445 #else
446                         ret = -ENOTSUP;
447 #endif
448                         if (ret != -EINVAL) {
449                                 pol->port[i] = x;
450                                 break;
451                         }
452                 }
453                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
454                         RTE_LOG(INFO, CHANNEL_MONITOR,
455                                 "Error with Policy. MAC not found on "
456                                 "attached ports ");
457                         pol->enabled = 0;
458                         return ret;
459                 }
460                 pol->pfid[i] = ret;
461         }
462         return 1;
463 }
464
465 static int
466 update_policy(struct channel_packet *pkt)
467 {
468
469         unsigned int updated = 0;
470         unsigned int i;
471
472
473         RTE_LOG(INFO, CHANNEL_MONITOR,
474                         "Applying policy for %s\n", pkt->vm_name);
475
476         for (i = 0; i < RTE_DIM(policies); i++) {
477                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
478                         /* Copy the contents of *pkt into the policy.pkt */
479                         policies[i].pkt = *pkt;
480                         get_pcpu_to_control(&policies[i]);
481                         /* Check Eth dev only for Traffic policy */
482                         if (policies[i].pkt.policy_to_use == TRAFFIC) {
483                                 if (get_pfid(&policies[i]) < 0) {
484                                         updated = 1;
485                                         break;
486                                 }
487                         }
488                         core_share_status(i);
489                         policies[i].enabled = 1;
490                         updated = 1;
491                 }
492         }
493         if (!updated) {
494                 for (i = 0; i < RTE_DIM(policies); i++) {
495                         if (policies[i].enabled == 0) {
496                                 policies[i].pkt = *pkt;
497                                 get_pcpu_to_control(&policies[i]);
498                                 /* Check Eth dev only for Traffic policy */
499                                 if (policies[i].pkt.policy_to_use == TRAFFIC) {
500                                         if (get_pfid(&policies[i]) < 0) {
501                                                 updated = 1;
502                                                 break;
503                                         }
504                                 }
505                                 core_share_status(i);
506                                 policies[i].enabled = 1;
507                                 break;
508                         }
509                 }
510         }
511         return 0;
512 }
513
514 static int
515 remove_policy(struct channel_packet *pkt __rte_unused)
516 {
517         unsigned int i;
518
519         /*
520          * Disabling the policy is simply a case of setting
521          * enabled to 0
522          */
523         for (i = 0; i < RTE_DIM(policies); i++) {
524                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
525                         policies[i].enabled = 0;
526                         return 0;
527                 }
528         }
529         return -1;
530 }
531
532 static uint64_t
533 get_pkt_diff(struct policy *pol)
534 {
535
536         uint64_t vsi_pkt_count,
537                 vsi_pkt_total = 0,
538                 vsi_pkt_count_prev_total = 0;
539         double rdtsc_curr, rdtsc_diff, diff;
540         int x;
541 #ifdef RTE_LIBRTE_I40E_PMD
542         struct rte_eth_stats vf_stats;
543 #endif
544
545         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
546
547 #ifdef RTE_LIBRTE_I40E_PMD
548                 /*Read vsi stats*/
549                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
550                         vsi_pkt_count = vf_stats.ipackets;
551                 else
552                         vsi_pkt_count = -1;
553 #else
554                 vsi_pkt_count = -1;
555 #endif
556
557                 vsi_pkt_total += vsi_pkt_count;
558
559                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
560                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
561         }
562
563         rdtsc_curr = rte_rdtsc_precise();
564         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
565         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
566
567         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
568                         ((double)rte_get_tsc_hz() / rdtsc_diff);
569
570         return diff;
571 }
572
573 static void
574 apply_traffic_profile(struct policy *pol)
575 {
576
577         int count;
578         uint64_t diff = 0;
579
580         diff = get_pkt_diff(pol);
581
582         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
583                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
584                         if (pol->core_share[count].status != 1)
585                                 power_manager_scale_core_max(
586                                                 pol->core_share[count].pcpu);
587                 }
588         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
589                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
590                         if (pol->core_share[count].status != 1)
591                                 power_manager_scale_core_med(
592                                                 pol->core_share[count].pcpu);
593                 }
594         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
595                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
596                         if (pol->core_share[count].status != 1)
597                                 power_manager_scale_core_min(
598                                                 pol->core_share[count].pcpu);
599                 }
600         }
601 }
602
603 static void
604 apply_time_profile(struct policy *pol)
605 {
606
607         int count, x;
608         struct timeval tv;
609         struct tm *ptm;
610         char time_string[40];
611
612         /* Obtain the time of day, and convert it to a tm struct. */
613         gettimeofday(&tv, NULL);
614         ptm = localtime(&tv.tv_sec);
615         /* Format the date and time, down to a single second. */
616         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
617
618         for (x = 0; x < HOURS; x++) {
619
620                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
621                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
622                                 if (pol->core_share[count].status != 1) {
623                                         power_manager_scale_core_max(
624                                                 pol->core_share[count].pcpu);
625                                 }
626                         }
627                         break;
628                 } else if (ptm->tm_hour ==
629                                 pol->pkt.timer_policy.quiet_hours[x]) {
630                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
631                                 if (pol->core_share[count].status != 1) {
632                                         power_manager_scale_core_min(
633                                                 pol->core_share[count].pcpu);
634                         }
635                 }
636                         break;
637                 } else if (ptm->tm_hour ==
638                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
639                         apply_traffic_profile(pol);
640                         break;
641                 }
642         }
643 }
644
645 static void
646 apply_workload_profile(struct policy *pol)
647 {
648
649         int count;
650
651         if (pol->pkt.workload == HIGH) {
652                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
653                         if (pol->core_share[count].status != 1)
654                                 power_manager_scale_core_max(
655                                                 pol->core_share[count].pcpu);
656                 }
657         } else if (pol->pkt.workload == MEDIUM) {
658                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
659                         if (pol->core_share[count].status != 1)
660                                 power_manager_scale_core_med(
661                                                 pol->core_share[count].pcpu);
662                 }
663         } else if (pol->pkt.workload == LOW) {
664                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
665                         if (pol->core_share[count].status != 1)
666                                 power_manager_scale_core_min(
667                                                 pol->core_share[count].pcpu);
668                 }
669         }
670 }
671
672 static void
673 apply_policy(struct policy *pol)
674 {
675
676         struct channel_packet *pkt = &pol->pkt;
677
678         /*Check policy to use*/
679         if (pkt->policy_to_use == TRAFFIC)
680                 apply_traffic_profile(pol);
681         else if (pkt->policy_to_use == TIME)
682                 apply_time_profile(pol);
683         else if (pkt->policy_to_use == WORKLOAD)
684                 apply_workload_profile(pol);
685 }
686
687 static int
688 write_binary_packet(void *buffer,
689                 size_t buffer_len,
690                 struct channel_info *chan_info)
691 {
692         int ret;
693
694         if (buffer_len == 0 || buffer == NULL)
695                 return -1;
696
697         if (chan_info->fd < 0) {
698                 RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
699                 return -1;
700         }
701
702         while (buffer_len > 0) {
703                 ret = write(chan_info->fd, buffer, buffer_len);
704                 if (ret == -1) {
705                         if (errno == EINTR)
706                                 continue;
707                         RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
708                                         strerror(errno));
709                         return -1;
710                 }
711                 buffer = (char *)buffer + ret;
712                 buffer_len -= ret;
713         }
714         return 0;
715 }
716
717 static int
718 send_freq(struct channel_packet *pkt,
719                 struct channel_info *chan_info,
720                 bool freq_list)
721 {
722         unsigned int vcore_id = pkt->resource_id;
723         struct channel_packet_freq_list channel_pkt_freq_list;
724         struct vm_info info;
725
726         if (get_info_vm(pkt->vm_name, &info) != 0)
727                 return -1;
728
729         if (!freq_list && vcore_id >= MAX_VCPU_PER_VM)
730                 return -1;
731
732         if (!info.allow_query)
733                 return -1;
734
735         channel_pkt_freq_list.command = CPU_POWER_FREQ_LIST;
736         channel_pkt_freq_list.num_vcpu = info.num_vcpus;
737
738         if (freq_list) {
739                 unsigned int i;
740                 for (i = 0; i < info.num_vcpus; i++)
741                         channel_pkt_freq_list.freq_list[i] =
742                           power_manager_get_current_frequency(info.pcpu_map[i]);
743         } else {
744                 channel_pkt_freq_list.freq_list[vcore_id] =
745                   power_manager_get_current_frequency(info.pcpu_map[vcore_id]);
746         }
747
748         return write_binary_packet(&channel_pkt_freq_list,
749                         sizeof(channel_pkt_freq_list),
750                         chan_info);
751 }
752
753 static int
754 send_capabilities(struct channel_packet *pkt,
755                 struct channel_info *chan_info,
756                 bool list_requested)
757 {
758         unsigned int vcore_id = pkt->resource_id;
759         struct channel_packet_caps_list channel_pkt_caps_list;
760         struct vm_info info;
761         struct rte_power_core_capabilities caps;
762         int ret;
763
764         if (get_info_vm(pkt->vm_name, &info) != 0)
765                 return -1;
766
767         if (!list_requested && vcore_id >= MAX_VCPU_PER_VM)
768                 return -1;
769
770         if (!info.allow_query)
771                 return -1;
772
773         channel_pkt_caps_list.command = CPU_POWER_CAPS_LIST;
774         channel_pkt_caps_list.num_vcpu = info.num_vcpus;
775
776         if (list_requested) {
777                 unsigned int i;
778                 for (i = 0; i < info.num_vcpus; i++) {
779                         ret = rte_power_get_capabilities(info.pcpu_map[i],
780                                         &caps);
781                         if (ret == 0) {
782                                 channel_pkt_caps_list.turbo[i] =
783                                                 caps.turbo;
784                                 channel_pkt_caps_list.priority[i] =
785                                                 caps.priority;
786                         } else
787                                 return -1;
788
789                 }
790         } else {
791                 ret = rte_power_get_capabilities(info.pcpu_map[vcore_id],
792                                 &caps);
793                 if (ret == 0) {
794                         channel_pkt_caps_list.turbo[vcore_id] =
795                                         caps.turbo;
796                         channel_pkt_caps_list.priority[vcore_id] =
797                                         caps.priority;
798                 } else
799                         return -1;
800         }
801
802         return write_binary_packet(&channel_pkt_caps_list,
803                         sizeof(channel_pkt_caps_list),
804                         chan_info);
805 }
806
807 static int
808 send_ack_for_received_cmd(struct channel_packet *pkt,
809                 struct channel_info *chan_info,
810                 uint32_t command)
811 {
812         pkt->command = command;
813         return write_binary_packet(pkt,
814                         sizeof(struct channel_packet),
815                         chan_info);
816 }
817
818 static int
819 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
820 {
821         int ret;
822
823         if (chan_info == NULL)
824                 return -1;
825
826         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
827                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
828                 return -1;
829
830         if (pkt->command == CPU_POWER) {
831                 unsigned int core_num;
832
833                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
834                         core_num = get_pcpu(chan_info, pkt->resource_id);
835                 else
836                         core_num = pkt->resource_id;
837
838                 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
839                         core_num);
840
841                 int scale_res;
842                 bool valid_unit = true;
843
844                 switch (pkt->unit) {
845                 case(CPU_POWER_SCALE_MIN):
846                         scale_res = power_manager_scale_core_min(core_num);
847                         break;
848                 case(CPU_POWER_SCALE_MAX):
849                         scale_res = power_manager_scale_core_max(core_num);
850                         break;
851                 case(CPU_POWER_SCALE_DOWN):
852                         scale_res = power_manager_scale_core_down(core_num);
853                         break;
854                 case(CPU_POWER_SCALE_UP):
855                         scale_res = power_manager_scale_core_up(core_num);
856                         break;
857                 case(CPU_POWER_ENABLE_TURBO):
858                         scale_res = power_manager_enable_turbo_core(core_num);
859                         break;
860                 case(CPU_POWER_DISABLE_TURBO):
861                         scale_res = power_manager_disable_turbo_core(core_num);
862                         break;
863                 default:
864                         valid_unit = false;
865                         break;
866                 }
867
868                 if (valid_unit) {
869                         ret = send_ack_for_received_cmd(pkt,
870                                         chan_info,
871                                         scale_res >= 0 ?
872                                                 CPU_POWER_CMD_ACK :
873                                                 CPU_POWER_CMD_NACK);
874                         if (ret < 0)
875                                 RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
876                 } else
877                         RTE_LOG(ERR, CHANNEL_MONITOR, "Unexpected unit type.\n");
878
879         }
880
881         if (pkt->command == PKT_POLICY) {
882                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
883                                 pkt->vm_name);
884                 int ret = send_ack_for_received_cmd(pkt,
885                                 chan_info,
886                                 CPU_POWER_CMD_ACK);
887                 if (ret < 0)
888                         RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
889                 update_policy(pkt);
890                 policy_is_set = 1;
891         }
892
893         if (pkt->command == PKT_POLICY_REMOVE) {
894                 ret = remove_policy(pkt);
895                 if (ret == 0)
896                         RTE_LOG(INFO, CHANNEL_MONITOR,
897                                  "Removed policy %s\n", pkt->vm_name);
898                 else
899                         RTE_LOG(INFO, CHANNEL_MONITOR,
900                                  "Policy %s does not exist\n", pkt->vm_name);
901         }
902
903         if (pkt->command == CPU_POWER_QUERY_FREQ_LIST ||
904                 pkt->command == CPU_POWER_QUERY_FREQ) {
905
906                 RTE_LOG(INFO, CHANNEL_MONITOR,
907                         "Frequency for %s requested.\n", pkt->vm_name);
908                 int ret = send_freq(pkt,
909                                 chan_info,
910                                 pkt->command == CPU_POWER_QUERY_FREQ_LIST);
911                 if (ret < 0)
912                         RTE_LOG(ERR, CHANNEL_MONITOR, "Error during frequency sending.\n");
913         }
914
915         if (pkt->command == CPU_POWER_QUERY_CAPS_LIST ||
916                 pkt->command == CPU_POWER_QUERY_CAPS) {
917
918                 RTE_LOG(INFO, CHANNEL_MONITOR,
919                         "Capabilities for %s requested.\n", pkt->vm_name);
920                 int ret = send_capabilities(pkt,
921                                 chan_info,
922                                 pkt->command == CPU_POWER_QUERY_CAPS_LIST);
923                 if (ret < 0)
924                         RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending capabilities.\n");
925         }
926
927         /*
928          * Return is not checked as channel status may have been set to DISABLED
929          * from management thread
930          */
931         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
932                         CHANNEL_MGR_CHANNEL_CONNECTED);
933         return 0;
934
935 }
936
937 int
938 add_channel_to_monitor(struct channel_info **chan_info)
939 {
940         struct channel_info *info = *chan_info;
941         struct epoll_event event;
942
943         event.events = EPOLLIN;
944         event.data.ptr = info;
945         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
946                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
947                                 "to epoll\n", info->channel_path);
948                 return -1;
949         }
950         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
951                         "to monitor\n", info->channel_path);
952         return 0;
953 }
954
955 int
956 remove_channel_from_monitor(struct channel_info *chan_info)
957 {
958         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
959                         chan_info->fd, NULL) < 0) {
960                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
961                                 "from epoll\n", chan_info->channel_path);
962                 return -1;
963         }
964         return 0;
965 }
966
967 int
968 channel_monitor_init(void)
969 {
970         global_event_fd = epoll_create1(0);
971         if (global_event_fd == 0) {
972                 RTE_LOG(ERR, CHANNEL_MONITOR,
973                                 "Error creating epoll context with error %s\n",
974                                 strerror(errno));
975                 return -1;
976         }
977         global_events_list = rte_malloc("epoll_events",
978                         sizeof(*global_events_list)
979                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
980         if (global_events_list == NULL) {
981                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
982                                 "epoll events\n");
983                 return -1;
984         }
985         return 0;
986 }
987
988 static void
989 read_binary_packet(struct channel_info *chan_info)
990 {
991         struct channel_packet pkt;
992         void *buffer = &pkt;
993         int buffer_len = sizeof(pkt);
994         int n_bytes, err = 0;
995
996         while (buffer_len > 0) {
997                 n_bytes = read(chan_info->fd,
998                                 buffer, buffer_len);
999                 if (n_bytes == buffer_len)
1000                         break;
1001                 if (n_bytes < 0) {
1002                         err = errno;
1003                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
1004                                 "Received error on "
1005                                 "channel '%s' read: %s\n",
1006                                 chan_info->channel_path,
1007                                 strerror(err));
1008                         remove_channel(&chan_info);
1009                         break;
1010                 }
1011                 buffer = (char *)buffer + n_bytes;
1012                 buffer_len -= n_bytes;
1013         }
1014         if (!err)
1015                 process_request(&pkt, chan_info);
1016 }
1017
1018 #ifdef USE_JANSSON
1019 static void
1020 read_json_packet(struct channel_info *chan_info)
1021 {
1022         struct channel_packet pkt;
1023         int n_bytes, ret;
1024         json_t *root;
1025         json_error_t error;
1026         const char *resource_name;
1027         char *start, *end;
1028         uint32_t n;
1029
1030
1031         /* read opening brace to closing brace */
1032         do {
1033                 int idx = 0;
1034                 int indent = 0;
1035                 do {
1036                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
1037                         if (n_bytes == 0)
1038                                 break;
1039                         if (json_data[idx] == '{')
1040                                 indent++;
1041                         if (json_data[idx] == '}')
1042                                 indent--;
1043                         if ((indent > 0) || (idx > 0))
1044                                 idx++;
1045                         if (indent <= 0)
1046                                 json_data[idx] = 0;
1047                         if (idx >= MAX_JSON_STRING_LEN-1)
1048                                 break;
1049                 } while (indent > 0);
1050
1051                 json_data[idx] = '\0';
1052
1053                 if (strlen(json_data) == 0)
1054                         continue;
1055
1056                 printf("got [%s]\n", json_data);
1057
1058                 root = json_loads(json_data, 0, &error);
1059
1060                 if (root) {
1061                         resource_name = get_resource_name_from_chn_path(
1062                                 chan_info->channel_path);
1063                         /*
1064                          * Because our data is now in the json
1065                          * object, we can overwrite the pkt
1066                          * with a channel_packet struct, using
1067                          * parse_json_to_pkt()
1068                          */
1069                         ret = parse_json_to_pkt(root, &pkt, resource_name);
1070                         json_decref(root);
1071                         if (ret) {
1072                                 RTE_LOG(ERR, CHANNEL_MONITOR,
1073                                         "Error validating JSON profile data\n");
1074                                 break;
1075                         }
1076                         start = strstr(pkt.vm_name,
1077                                         CHANNEL_MGR_FIFO_PATTERN_NAME);
1078                         if (start != NULL) {
1079                                 /* move past pattern to start of fifo id */
1080                                 start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
1081
1082                                 end = start;
1083                                 n = (uint32_t)strtoul(start, &end, 10);
1084
1085                                 if (end[0] == '\0') {
1086                                         /* Add core id to core list */
1087                                         pkt.num_vcpu = 1;
1088                                         pkt.vcpu_to_control[0] = n;
1089                                         process_request(&pkt, chan_info);
1090                                 } else {
1091                                         RTE_LOG(ERR, CHANNEL_MONITOR,
1092                                                 "Cannot extract core id from fifo name\n");
1093                                 }
1094                         } else {
1095                                 process_request(&pkt, chan_info);
1096                         }
1097                 } else {
1098                         RTE_LOG(ERR, CHANNEL_MONITOR,
1099                                         "JSON error on line %d: %s\n",
1100                                         error.line, error.text);
1101                 }
1102         } while (n_bytes > 0);
1103 }
1104 #endif
1105
1106 void
1107 run_channel_monitor(void)
1108 {
1109         while (run_loop) {
1110                 int n_events, i;
1111
1112                 n_events = epoll_wait(global_event_fd, global_events_list,
1113                                 MAX_EVENTS, 1);
1114                 if (!run_loop)
1115                         break;
1116                 for (i = 0; i < n_events; i++) {
1117                         struct channel_info *chan_info = (struct channel_info *)
1118                                         global_events_list[i].data.ptr;
1119                         if ((global_events_list[i].events & EPOLLERR) ||
1120                                 (global_events_list[i].events & EPOLLHUP)) {
1121                                 RTE_LOG(INFO, CHANNEL_MONITOR,
1122                                                 "Remote closed connection for "
1123                                                 "channel '%s'\n",
1124                                                 chan_info->channel_path);
1125                                 remove_channel(&chan_info);
1126                                 continue;
1127                         }
1128                         if (global_events_list[i].events & EPOLLIN) {
1129
1130                                 switch (chan_info->type) {
1131                                 case CHANNEL_TYPE_BINARY:
1132                                         read_binary_packet(chan_info);
1133                                         break;
1134 #ifdef USE_JANSSON
1135                                 case CHANNEL_TYPE_JSON:
1136                                         read_json_packet(chan_info);
1137                                         break;
1138 #endif
1139                                 default:
1140                                         break;
1141                                 }
1142                         }
1143                 }
1144                 rte_delay_us(time_period_ms*1000);
1145                 if (policy_is_set) {
1146                         unsigned int j;
1147
1148                         for (j = 0; j < RTE_DIM(policies); j++) {
1149                                 if (policies[j].enabled == 1)
1150                                         apply_policy(&policies[j]);
1151                         }
1152                 }
1153         }
1154 }