examples/vm_power: add mechanism to disable queries
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #ifdef RTE_LIBRTE_I40E_PMD
32 #include <rte_pmd_i40e.h>
33 #endif
34
35 #include <libvirt/libvirt.h>
36 #include "channel_monitor.h"
37 #include "channel_commands.h"
38 #include "channel_manager.h"
39 #include "power_manager.h"
40 #include "oob_monitor.h"
41
42 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
43
44 #define MAX_EVENTS 256
45
46 uint64_t vsi_pkt_count_prev[384];
47 uint64_t rdtsc_prev[384];
48 #define MAX_JSON_STRING_LEN 1024
49 char json_data[MAX_JSON_STRING_LEN];
50
51 double time_period_ms = 1;
52 static volatile unsigned run_loop = 1;
53 static int global_event_fd;
54 static unsigned int policy_is_set;
55 static struct epoll_event *global_events_list;
56 static struct policy policies[RTE_MAX_LCORE];
57
58 #ifdef USE_JANSSON
59
60 union PFID {
61         struct rte_ether_addr addr;
62         uint64_t pfid;
63 };
64
65 static int
66 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
67 {
68         int i;
69         char *end;
70         unsigned long o[RTE_ETHER_ADDR_LEN];
71
72         i = 0;
73         do {
74                 errno = 0;
75                 o[i] = strtoul(a, &end, 16);
76                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
77                         return -1;
78                 a = end + 1;
79         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
80
81         /* Junk at the end of line */
82         if (end[0] != 0)
83                 return -1;
84
85         /* Support the format XX:XX:XX:XX:XX:XX */
86         if (i == RTE_ETHER_ADDR_LEN) {
87                 while (i-- != 0) {
88                         if (o[i] > UINT8_MAX)
89                                 return -1;
90                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
91                 }
92         /* Support the format XXXX:XXXX:XXXX */
93         } else if (i == RTE_ETHER_ADDR_LEN / 2) {
94                 while (i-- != 0) {
95                         if (o[i] > UINT16_MAX)
96                                 return -1;
97                         ether_addr->addr_bytes[i * 2] =
98                                         (uint8_t)(o[i] >> 8);
99                         ether_addr->addr_bytes[i * 2 + 1] =
100                                         (uint8_t)(o[i] & 0xff);
101                 }
102         /* unknown format */
103         } else
104                 return -1;
105
106         return 0;
107 }
108
109 static int
110 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
111 {
112         union PFID pfid;
113         int ret;
114
115         /* Use port MAC address as the vfid */
116         ret = str_to_ether_addr(mac, &pfid.addr);
117
118         if (ret != 0) {
119                 RTE_LOG(ERR, CHANNEL_MONITOR,
120                         "Invalid mac address received in JSON\n");
121                 pkt->vfid[idx] = 0;
122                 return -1;
123         }
124
125         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
126                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
127                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
128                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
129                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
130
131         pkt->vfid[idx] = pfid.pfid;
132         return 0;
133 }
134
135 static char*
136 get_resource_name_from_chn_path(const char *channel_path)
137 {
138         char *substr = NULL;
139
140         substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
141
142         return substr;
143 }
144
145 static int
146 get_resource_id_from_vmname(const char *vm_name)
147 {
148         int result = -1;
149         int off = 0;
150
151         if (vm_name == NULL)
152                 return -1;
153
154         while (vm_name[off] != '\0') {
155                 if (isdigit(vm_name[off]))
156                         break;
157                 off++;
158         }
159         result = atoi(&vm_name[off]);
160         if ((result == 0) && (vm_name[off] != '0'))
161                 return -1;
162
163         return result;
164 }
165
166 static int
167 parse_json_to_pkt(json_t *element, struct channel_packet *pkt,
168                                         const char *vm_name)
169 {
170         const char *key;
171         json_t *value;
172         int ret;
173         int resource_id;
174
175         memset(pkt, 0, sizeof(struct channel_packet));
176
177         pkt->nb_mac_to_monitor = 0;
178         pkt->t_boost_status.tbEnabled = false;
179         pkt->workload = LOW;
180         pkt->policy_to_use = TIME;
181         pkt->command = PKT_POLICY;
182         pkt->core_type = CORE_TYPE_PHYSICAL;
183
184         if (vm_name == NULL) {
185                 RTE_LOG(ERR, CHANNEL_MONITOR,
186                         "vm_name is NULL, request rejected !\n");
187                 return -1;
188         }
189
190         json_object_foreach(element, key, value) {
191                 if (!strcmp(key, "policy")) {
192                         /* Recurse in to get the contents of profile */
193                         ret = parse_json_to_pkt(value, pkt, vm_name);
194                         if (ret)
195                                 return ret;
196                 } else if (!strcmp(key, "instruction")) {
197                         /* Recurse in to get the contents of instruction */
198                         ret = parse_json_to_pkt(value, pkt, vm_name);
199                         if (ret)
200                                 return ret;
201                 } else if (!strcmp(key, "command")) {
202                         char command[32];
203                         strlcpy(command, json_string_value(value), 32);
204                         if (!strcmp(command, "power")) {
205                                 pkt->command = CPU_POWER;
206                         } else if (!strcmp(command, "create")) {
207                                 pkt->command = PKT_POLICY;
208                         } else if (!strcmp(command, "destroy")) {
209                                 pkt->command = PKT_POLICY_REMOVE;
210                         } else {
211                                 RTE_LOG(ERR, CHANNEL_MONITOR,
212                                         "Invalid command received in JSON\n");
213                                 return -1;
214                         }
215                 } else if (!strcmp(key, "policy_type")) {
216                         char command[32];
217                         strlcpy(command, json_string_value(value), 32);
218                         if (!strcmp(command, "TIME")) {
219                                 pkt->policy_to_use = TIME;
220                         } else if (!strcmp(command, "TRAFFIC")) {
221                                 pkt->policy_to_use = TRAFFIC;
222                         } else if (!strcmp(command, "WORKLOAD")) {
223                                 pkt->policy_to_use = WORKLOAD;
224                         } else if (!strcmp(command, "BRANCH_RATIO")) {
225                                 pkt->policy_to_use = BRANCH_RATIO;
226                         } else {
227                                 RTE_LOG(ERR, CHANNEL_MONITOR,
228                                         "Wrong policy_type received in JSON\n");
229                                 return -1;
230                         }
231                 } else if (!strcmp(key, "workload")) {
232                         char command[32];
233                         strlcpy(command, json_string_value(value), 32);
234                         if (!strcmp(command, "HIGH")) {
235                                 pkt->workload = HIGH;
236                         } else if (!strcmp(command, "MEDIUM")) {
237                                 pkt->workload = MEDIUM;
238                         } else if (!strcmp(command, "LOW")) {
239                                 pkt->workload = LOW;
240                         } else {
241                                 RTE_LOG(ERR, CHANNEL_MONITOR,
242                                         "Wrong workload received in JSON\n");
243                                 return -1;
244                         }
245                 } else if (!strcmp(key, "busy_hours")) {
246                         unsigned int i;
247                         size_t size = json_array_size(value);
248
249                         for (i = 0; i < size; i++) {
250                                 int hour = (int)json_integer_value(
251                                                 json_array_get(value, i));
252                                 pkt->timer_policy.busy_hours[i] = hour;
253                         }
254                 } else if (!strcmp(key, "quiet_hours")) {
255                         unsigned int i;
256                         size_t size = json_array_size(value);
257
258                         for (i = 0; i < size; i++) {
259                                 int hour = (int)json_integer_value(
260                                                 json_array_get(value, i));
261                                 pkt->timer_policy.quiet_hours[i] = hour;
262                         }
263                 } else if (!strcmp(key, "mac_list")) {
264                         unsigned int i;
265                         size_t size = json_array_size(value);
266
267                         for (i = 0; i < size; i++) {
268                                 char mac[32];
269                                 strlcpy(mac,
270                                         json_string_value(json_array_get(value, i)),
271                                         32);
272                                 set_policy_mac(pkt, i, mac);
273                         }
274                         pkt->nb_mac_to_monitor = size;
275                 } else if (!strcmp(key, "avg_packet_thresh")) {
276                         pkt->traffic_policy.avg_max_packet_thresh =
277                                         (uint32_t)json_integer_value(value);
278                 } else if (!strcmp(key, "max_packet_thresh")) {
279                         pkt->traffic_policy.max_max_packet_thresh =
280                                         (uint32_t)json_integer_value(value);
281                 } else if (!strcmp(key, "unit")) {
282                         char unit[32];
283                         strlcpy(unit, json_string_value(value), 32);
284                         if (!strcmp(unit, "SCALE_UP")) {
285                                 pkt->unit = CPU_POWER_SCALE_UP;
286                         } else if (!strcmp(unit, "SCALE_DOWN")) {
287                                 pkt->unit = CPU_POWER_SCALE_DOWN;
288                         } else if (!strcmp(unit, "SCALE_MAX")) {
289                                 pkt->unit = CPU_POWER_SCALE_MAX;
290                         } else if (!strcmp(unit, "SCALE_MIN")) {
291                                 pkt->unit = CPU_POWER_SCALE_MIN;
292                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
293                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
294                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
295                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
296                         } else {
297                                 RTE_LOG(ERR, CHANNEL_MONITOR,
298                                         "Invalid command received in JSON\n");
299                                 return -1;
300                         }
301                 } else {
302                         RTE_LOG(ERR, CHANNEL_MONITOR,
303                                 "Unknown key received in JSON string: %s\n",
304                                 key);
305                 }
306
307                 resource_id = get_resource_id_from_vmname(vm_name);
308                 if (resource_id < 0) {
309                         RTE_LOG(ERR, CHANNEL_MONITOR,
310                                 "Could not get resource_id from vm_name:%s\n",
311                                 vm_name);
312                         return -1;
313                 }
314                 strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ);
315                 pkt->resource_id = resource_id;
316         }
317         return 0;
318 }
319 #endif
320
321 void channel_monitor_exit(void)
322 {
323         run_loop = 0;
324         rte_free(global_events_list);
325 }
326
327 static void
328 core_share(int pNo, int z, int x, int t)
329 {
330         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
331                 if (strcmp(policies[pNo].pkt.vm_name,
332                                 lvm_info[x].vm_name) != 0) {
333                         policies[pNo].core_share[z].status = 1;
334                         power_manager_scale_core_max(
335                                         policies[pNo].core_share[z].pcpu);
336                 }
337         }
338 }
339
340 static void
341 core_share_status(int pNo)
342 {
343
344         int noVms = 0, noVcpus = 0, z, x, t;
345
346         get_all_vm(&noVms, &noVcpus);
347
348         /* Reset Core Share Status. */
349         for (z = 0; z < noVcpus; z++)
350                 policies[pNo].core_share[z].status = 0;
351
352         /* Foreach vcpu in a policy. */
353         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
354                 /* Foreach VM on the platform. */
355                 for (x = 0; x < noVms; x++) {
356                         /* Foreach vcpu of VMs on platform. */
357                         for (t = 0; t < lvm_info[x].num_cpus; t++)
358                                 core_share(pNo, z, x, t);
359                 }
360         }
361 }
362
363
364 static int
365 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
366 {
367         int ret = 0;
368
369         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
370                 ci->cd[pcpu].oob_enabled = 1;
371                 ret = add_core_to_monitor(pcpu);
372                 if (ret == 0)
373                         RTE_LOG(INFO, CHANNEL_MONITOR,
374                                         "Monitoring pcpu %d OOB for %s\n",
375                                         pcpu, pol->pkt.vm_name);
376                 else
377                         RTE_LOG(ERR, CHANNEL_MONITOR,
378                                         "Error monitoring pcpu %d OOB for %s\n",
379                                         pcpu, pol->pkt.vm_name);
380
381         } else {
382                 pol->core_share[count].pcpu = pcpu;
383                 RTE_LOG(INFO, CHANNEL_MONITOR,
384                                 "Monitoring pcpu %d for %s\n",
385                                 pcpu, pol->pkt.vm_name);
386         }
387         return ret;
388 }
389
390 static void
391 get_pcpu_to_control(struct policy *pol)
392 {
393
394         /* Convert vcpu to pcpu. */
395         struct vm_info info;
396         int pcpu, count;
397         struct core_info *ci;
398
399         ci = get_core_info();
400
401         RTE_LOG(DEBUG, CHANNEL_MONITOR,
402                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
403
404         /*
405          * So now that we're handling virtual and physical cores, we need to
406          * differenciate between them when adding them to the branch monitor.
407          * Virtual cores need to be converted to physical cores.
408          */
409         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
410                 /*
411                  * If the cores in the policy are virtual, we need to map them
412                  * to physical core. We look up the vm info and use that for
413                  * the mapping.
414                  */
415                 get_info_vm(pol->pkt.vm_name, &info);
416                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
417                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
418                         pcpu_monitor(pol, ci, pcpu, count);
419                 }
420         } else {
421                 /*
422                  * If the cores in the policy are physical, we just use
423                  * those core id's directly.
424                  */
425                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
426                         pcpu = pol->pkt.vcpu_to_control[count];
427                         pcpu_monitor(pol, ci, pcpu, count);
428                 }
429         }
430 }
431
432 static int
433 get_pfid(struct policy *pol)
434 {
435
436         int i, x, ret = 0;
437
438         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
439
440                 RTE_ETH_FOREACH_DEV(x) {
441 #ifdef RTE_LIBRTE_I40E_PMD
442                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
443                                 (struct rte_ether_addr *)&(pol->pkt.vfid[i]));
444 #else
445                         ret = -ENOTSUP;
446 #endif
447                         if (ret != -EINVAL) {
448                                 pol->port[i] = x;
449                                 break;
450                         }
451                 }
452                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
453                         RTE_LOG(INFO, CHANNEL_MONITOR,
454                                 "Error with Policy. MAC not found on "
455                                 "attached ports ");
456                         pol->enabled = 0;
457                         return ret;
458                 }
459                 pol->pfid[i] = ret;
460         }
461         return 1;
462 }
463
464 static int
465 update_policy(struct channel_packet *pkt)
466 {
467
468         unsigned int updated = 0;
469         unsigned int i;
470
471
472         RTE_LOG(INFO, CHANNEL_MONITOR,
473                         "Applying policy for %s\n", pkt->vm_name);
474
475         for (i = 0; i < RTE_DIM(policies); i++) {
476                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
477                         /* Copy the contents of *pkt into the policy.pkt */
478                         policies[i].pkt = *pkt;
479                         get_pcpu_to_control(&policies[i]);
480                         /* Check Eth dev only for Traffic policy */
481                         if (policies[i].pkt.policy_to_use == TRAFFIC) {
482                                 if (get_pfid(&policies[i]) < 0) {
483                                         updated = 1;
484                                         break;
485                                 }
486                         }
487                         core_share_status(i);
488                         policies[i].enabled = 1;
489                         updated = 1;
490                 }
491         }
492         if (!updated) {
493                 for (i = 0; i < RTE_DIM(policies); i++) {
494                         if (policies[i].enabled == 0) {
495                                 policies[i].pkt = *pkt;
496                                 get_pcpu_to_control(&policies[i]);
497                                 /* Check Eth dev only for Traffic policy */
498                                 if (policies[i].pkt.policy_to_use == TRAFFIC) {
499                                         if (get_pfid(&policies[i]) < 0) {
500                                                 updated = 1;
501                                                 break;
502                                         }
503                                 }
504                                 core_share_status(i);
505                                 policies[i].enabled = 1;
506                                 break;
507                         }
508                 }
509         }
510         return 0;
511 }
512
513 static int
514 remove_policy(struct channel_packet *pkt __rte_unused)
515 {
516         unsigned int i;
517
518         /*
519          * Disabling the policy is simply a case of setting
520          * enabled to 0
521          */
522         for (i = 0; i < RTE_DIM(policies); i++) {
523                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
524                         policies[i].enabled = 0;
525                         return 0;
526                 }
527         }
528         return -1;
529 }
530
531 static uint64_t
532 get_pkt_diff(struct policy *pol)
533 {
534
535         uint64_t vsi_pkt_count,
536                 vsi_pkt_total = 0,
537                 vsi_pkt_count_prev_total = 0;
538         double rdtsc_curr, rdtsc_diff, diff;
539         int x;
540 #ifdef RTE_LIBRTE_I40E_PMD
541         struct rte_eth_stats vf_stats;
542 #endif
543
544         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
545
546 #ifdef RTE_LIBRTE_I40E_PMD
547                 /*Read vsi stats*/
548                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
549                         vsi_pkt_count = vf_stats.ipackets;
550                 else
551                         vsi_pkt_count = -1;
552 #else
553                 vsi_pkt_count = -1;
554 #endif
555
556                 vsi_pkt_total += vsi_pkt_count;
557
558                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
559                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
560         }
561
562         rdtsc_curr = rte_rdtsc_precise();
563         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
564         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
565
566         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
567                         ((double)rte_get_tsc_hz() / rdtsc_diff);
568
569         return diff;
570 }
571
572 static void
573 apply_traffic_profile(struct policy *pol)
574 {
575
576         int count;
577         uint64_t diff = 0;
578
579         diff = get_pkt_diff(pol);
580
581         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
582                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
583                         if (pol->core_share[count].status != 1)
584                                 power_manager_scale_core_max(
585                                                 pol->core_share[count].pcpu);
586                 }
587         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
588                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
589                         if (pol->core_share[count].status != 1)
590                                 power_manager_scale_core_med(
591                                                 pol->core_share[count].pcpu);
592                 }
593         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
594                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
595                         if (pol->core_share[count].status != 1)
596                                 power_manager_scale_core_min(
597                                                 pol->core_share[count].pcpu);
598                 }
599         }
600 }
601
602 static void
603 apply_time_profile(struct policy *pol)
604 {
605
606         int count, x;
607         struct timeval tv;
608         struct tm *ptm;
609         char time_string[40];
610
611         /* Obtain the time of day, and convert it to a tm struct. */
612         gettimeofday(&tv, NULL);
613         ptm = localtime(&tv.tv_sec);
614         /* Format the date and time, down to a single second. */
615         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
616
617         for (x = 0; x < HOURS; x++) {
618
619                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
620                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
621                                 if (pol->core_share[count].status != 1) {
622                                         power_manager_scale_core_max(
623                                                 pol->core_share[count].pcpu);
624                                 }
625                         }
626                         break;
627                 } else if (ptm->tm_hour ==
628                                 pol->pkt.timer_policy.quiet_hours[x]) {
629                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
630                                 if (pol->core_share[count].status != 1) {
631                                         power_manager_scale_core_min(
632                                                 pol->core_share[count].pcpu);
633                         }
634                 }
635                         break;
636                 } else if (ptm->tm_hour ==
637                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
638                         apply_traffic_profile(pol);
639                         break;
640                 }
641         }
642 }
643
644 static void
645 apply_workload_profile(struct policy *pol)
646 {
647
648         int count;
649
650         if (pol->pkt.workload == HIGH) {
651                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
652                         if (pol->core_share[count].status != 1)
653                                 power_manager_scale_core_max(
654                                                 pol->core_share[count].pcpu);
655                 }
656         } else if (pol->pkt.workload == MEDIUM) {
657                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
658                         if (pol->core_share[count].status != 1)
659                                 power_manager_scale_core_med(
660                                                 pol->core_share[count].pcpu);
661                 }
662         } else if (pol->pkt.workload == LOW) {
663                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
664                         if (pol->core_share[count].status != 1)
665                                 power_manager_scale_core_min(
666                                                 pol->core_share[count].pcpu);
667                 }
668         }
669 }
670
671 static void
672 apply_policy(struct policy *pol)
673 {
674
675         struct channel_packet *pkt = &pol->pkt;
676
677         /*Check policy to use*/
678         if (pkt->policy_to_use == TRAFFIC)
679                 apply_traffic_profile(pol);
680         else if (pkt->policy_to_use == TIME)
681                 apply_time_profile(pol);
682         else if (pkt->policy_to_use == WORKLOAD)
683                 apply_workload_profile(pol);
684 }
685
686 static int
687 write_binary_packet(void *buffer,
688                 size_t buffer_len,
689                 struct channel_info *chan_info)
690 {
691         int ret;
692
693         if (buffer_len == 0 || buffer == NULL)
694                 return -1;
695
696         if (chan_info->fd < 0) {
697                 RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
698                 return -1;
699         }
700
701         while (buffer_len > 0) {
702                 ret = write(chan_info->fd, buffer, buffer_len);
703                 if (ret == -1) {
704                         if (errno == EINTR)
705                                 continue;
706                         RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
707                                         strerror(errno));
708                         return -1;
709                 }
710                 buffer = (char *)buffer + ret;
711                 buffer_len -= ret;
712         }
713         return 0;
714 }
715
716 static int
717 send_freq(struct channel_packet *pkt,
718                 struct channel_info *chan_info,
719                 bool freq_list)
720 {
721         unsigned int vcore_id = pkt->resource_id;
722         struct channel_packet_freq_list channel_pkt_freq_list;
723         struct vm_info info;
724
725         if (get_info_vm(pkt->vm_name, &info) != 0)
726                 return -1;
727
728         if (!freq_list && vcore_id >= MAX_VCPU_PER_VM)
729                 return -1;
730
731         if (!info.allow_query)
732                 return -1;
733
734         channel_pkt_freq_list.command = CPU_POWER_FREQ_LIST;
735         channel_pkt_freq_list.num_vcpu = info.num_vcpus;
736
737         if (freq_list) {
738                 unsigned int i;
739                 for (i = 0; i < info.num_vcpus; i++)
740                         channel_pkt_freq_list.freq_list[i] =
741                           power_manager_get_current_frequency(info.pcpu_map[i]);
742         } else {
743                 channel_pkt_freq_list.freq_list[vcore_id] =
744                   power_manager_get_current_frequency(info.pcpu_map[vcore_id]);
745         }
746
747         return write_binary_packet(&channel_pkt_freq_list,
748                         sizeof(channel_pkt_freq_list),
749                         chan_info);
750 }
751
752 static int
753 send_ack_for_received_cmd(struct channel_packet *pkt,
754                 struct channel_info *chan_info,
755                 uint32_t command)
756 {
757         pkt->command = command;
758         return write_binary_packet(pkt,
759                         sizeof(struct channel_packet),
760                         chan_info);
761 }
762
763 static int
764 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
765 {
766         int ret;
767
768         if (chan_info == NULL)
769                 return -1;
770
771         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
772                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
773                 return -1;
774
775         if (pkt->command == CPU_POWER) {
776                 unsigned int core_num;
777
778                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
779                         core_num = get_pcpu(chan_info, pkt->resource_id);
780                 else
781                         core_num = pkt->resource_id;
782
783                 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
784                         core_num);
785
786                 int scale_res;
787                 bool valid_unit = true;
788
789                 switch (pkt->unit) {
790                 case(CPU_POWER_SCALE_MIN):
791                         scale_res = power_manager_scale_core_min(core_num);
792                         break;
793                 case(CPU_POWER_SCALE_MAX):
794                         scale_res = power_manager_scale_core_max(core_num);
795                         break;
796                 case(CPU_POWER_SCALE_DOWN):
797                         scale_res = power_manager_scale_core_down(core_num);
798                         break;
799                 case(CPU_POWER_SCALE_UP):
800                         scale_res = power_manager_scale_core_up(core_num);
801                         break;
802                 case(CPU_POWER_ENABLE_TURBO):
803                         scale_res = power_manager_enable_turbo_core(core_num);
804                         break;
805                 case(CPU_POWER_DISABLE_TURBO):
806                         scale_res = power_manager_disable_turbo_core(core_num);
807                         break;
808                 default:
809                         valid_unit = false;
810                         break;
811                 }
812
813                 if (valid_unit) {
814                         ret = send_ack_for_received_cmd(pkt,
815                                         chan_info,
816                                         scale_res > 0 ?
817                                                 CPU_POWER_CMD_ACK :
818                                                 CPU_POWER_CMD_NACK);
819                         if (ret < 0)
820                                 RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
821                 } else
822                         RTE_LOG(ERR, CHANNEL_MONITOR, "Unexpected unit type.\n");
823
824         }
825
826         if (pkt->command == PKT_POLICY) {
827                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
828                                 pkt->vm_name);
829                 int ret = send_ack_for_received_cmd(pkt,
830                                 chan_info,
831                                 CPU_POWER_CMD_ACK);
832                 if (ret < 0)
833                         RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
834                 update_policy(pkt);
835                 policy_is_set = 1;
836         }
837
838         if (pkt->command == PKT_POLICY_REMOVE) {
839                 ret = remove_policy(pkt);
840                 if (ret == 0)
841                         RTE_LOG(INFO, CHANNEL_MONITOR,
842                                  "Removed policy %s\n", pkt->vm_name);
843                 else
844                         RTE_LOG(INFO, CHANNEL_MONITOR,
845                                  "Policy %s does not exist\n", pkt->vm_name);
846         }
847
848         if (pkt->command == CPU_POWER_QUERY_FREQ_LIST ||
849                 pkt->command == CPU_POWER_QUERY_FREQ) {
850
851                 RTE_LOG(INFO, CHANNEL_MONITOR,
852                         "Frequency for %s requested.\n", pkt->vm_name);
853                 int ret = send_freq(pkt,
854                                 chan_info,
855                                 pkt->command == CPU_POWER_QUERY_FREQ_LIST);
856                 if (ret < 0)
857                         RTE_LOG(ERR, CHANNEL_MONITOR, "Error during frequency sending.\n");
858         }
859
860         /*
861          * Return is not checked as channel status may have been set to DISABLED
862          * from management thread
863          */
864         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
865                         CHANNEL_MGR_CHANNEL_CONNECTED);
866         return 0;
867
868 }
869
870 int
871 add_channel_to_monitor(struct channel_info **chan_info)
872 {
873         struct channel_info *info = *chan_info;
874         struct epoll_event event;
875
876         event.events = EPOLLIN;
877         event.data.ptr = info;
878         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
879                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
880                                 "to epoll\n", info->channel_path);
881                 return -1;
882         }
883         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
884                         "to monitor\n", info->channel_path);
885         return 0;
886 }
887
888 int
889 remove_channel_from_monitor(struct channel_info *chan_info)
890 {
891         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
892                         chan_info->fd, NULL) < 0) {
893                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
894                                 "from epoll\n", chan_info->channel_path);
895                 return -1;
896         }
897         return 0;
898 }
899
900 int
901 channel_monitor_init(void)
902 {
903         global_event_fd = epoll_create1(0);
904         if (global_event_fd == 0) {
905                 RTE_LOG(ERR, CHANNEL_MONITOR,
906                                 "Error creating epoll context with error %s\n",
907                                 strerror(errno));
908                 return -1;
909         }
910         global_events_list = rte_malloc("epoll_events",
911                         sizeof(*global_events_list)
912                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
913         if (global_events_list == NULL) {
914                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
915                                 "epoll events\n");
916                 return -1;
917         }
918         return 0;
919 }
920
921 static void
922 read_binary_packet(struct channel_info *chan_info)
923 {
924         struct channel_packet pkt;
925         void *buffer = &pkt;
926         int buffer_len = sizeof(pkt);
927         int n_bytes, err = 0;
928
929         while (buffer_len > 0) {
930                 n_bytes = read(chan_info->fd,
931                                 buffer, buffer_len);
932                 if (n_bytes == buffer_len)
933                         break;
934                 if (n_bytes < 0) {
935                         err = errno;
936                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
937                                 "Received error on "
938                                 "channel '%s' read: %s\n",
939                                 chan_info->channel_path,
940                                 strerror(err));
941                         remove_channel(&chan_info);
942                         break;
943                 }
944                 buffer = (char *)buffer + n_bytes;
945                 buffer_len -= n_bytes;
946         }
947         if (!err)
948                 process_request(&pkt, chan_info);
949 }
950
951 #ifdef USE_JANSSON
952 static void
953 read_json_packet(struct channel_info *chan_info)
954 {
955         struct channel_packet pkt;
956         int n_bytes, ret;
957         json_t *root;
958         json_error_t error;
959         const char *resource_name;
960         char *start, *end;
961         uint32_t n;
962
963
964         /* read opening brace to closing brace */
965         do {
966                 int idx = 0;
967                 int indent = 0;
968                 do {
969                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
970                         if (n_bytes == 0)
971                                 break;
972                         if (json_data[idx] == '{')
973                                 indent++;
974                         if (json_data[idx] == '}')
975                                 indent--;
976                         if ((indent > 0) || (idx > 0))
977                                 idx++;
978                         if (indent <= 0)
979                                 json_data[idx] = 0;
980                         if (idx >= MAX_JSON_STRING_LEN-1)
981                                 break;
982                 } while (indent > 0);
983
984                 json_data[idx] = '\0';
985
986                 if (strlen(json_data) == 0)
987                         continue;
988
989                 printf("got [%s]\n", json_data);
990
991                 root = json_loads(json_data, 0, &error);
992
993                 if (root) {
994                         resource_name = get_resource_name_from_chn_path(
995                                 chan_info->channel_path);
996                         /*
997                          * Because our data is now in the json
998                          * object, we can overwrite the pkt
999                          * with a channel_packet struct, using
1000                          * parse_json_to_pkt()
1001                          */
1002                         ret = parse_json_to_pkt(root, &pkt, resource_name);
1003                         json_decref(root);
1004                         if (ret) {
1005                                 RTE_LOG(ERR, CHANNEL_MONITOR,
1006                                         "Error validating JSON profile data\n");
1007                                 break;
1008                         }
1009                         start = strstr(pkt.vm_name,
1010                                         CHANNEL_MGR_FIFO_PATTERN_NAME);
1011                         if (start != NULL) {
1012                                 /* move past pattern to start of fifo id */
1013                                 start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
1014
1015                                 end = start;
1016                                 n = (uint32_t)strtoul(start, &end, 10);
1017
1018                                 if (end[0] == '\0') {
1019                                         /* Add core id to core list */
1020                                         pkt.num_vcpu = 1;
1021                                         pkt.vcpu_to_control[0] = n;
1022                                         process_request(&pkt, chan_info);
1023                                 } else {
1024                                         RTE_LOG(ERR, CHANNEL_MONITOR,
1025                                                 "Cannot extract core id from fifo name\n");
1026                                 }
1027                         } else {
1028                                 process_request(&pkt, chan_info);
1029                         }
1030                 } else {
1031                         RTE_LOG(ERR, CHANNEL_MONITOR,
1032                                         "JSON error on line %d: %s\n",
1033                                         error.line, error.text);
1034                 }
1035         } while (n_bytes > 0);
1036 }
1037 #endif
1038
1039 void
1040 run_channel_monitor(void)
1041 {
1042         while (run_loop) {
1043                 int n_events, i;
1044
1045                 n_events = epoll_wait(global_event_fd, global_events_list,
1046                                 MAX_EVENTS, 1);
1047                 if (!run_loop)
1048                         break;
1049                 for (i = 0; i < n_events; i++) {
1050                         struct channel_info *chan_info = (struct channel_info *)
1051                                         global_events_list[i].data.ptr;
1052                         if ((global_events_list[i].events & EPOLLERR) ||
1053                                 (global_events_list[i].events & EPOLLHUP)) {
1054                                 RTE_LOG(INFO, CHANNEL_MONITOR,
1055                                                 "Remote closed connection for "
1056                                                 "channel '%s'\n",
1057                                                 chan_info->channel_path);
1058                                 remove_channel(&chan_info);
1059                                 continue;
1060                         }
1061                         if (global_events_list[i].events & EPOLLIN) {
1062
1063                                 switch (chan_info->type) {
1064                                 case CHANNEL_TYPE_BINARY:
1065                                         read_binary_packet(chan_info);
1066                                         break;
1067 #ifdef USE_JANSSON
1068                                 case CHANNEL_TYPE_JSON:
1069                                         read_json_packet(chan_info);
1070                                         break;
1071 #endif
1072                                 default:
1073                                         break;
1074                                 }
1075                         }
1076                 }
1077                 rte_delay_us(time_period_ms*1000);
1078                 if (policy_is_set) {
1079                         unsigned int j;
1080
1081                         for (j = 0; j < RTE_DIM(policies); j++) {
1082                                 if (policies[j].enabled == 1)
1083                                         apply_policy(&policies[j]);
1084                         }
1085                 }
1086         }
1087 }