examples/power: fix strcpy buffer overrun
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41
42 #define MAX_EVENTS 256
43
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[RTE_MAX_LCORE];
55
56 #ifdef USE_JANSSON
57
58 union PFID {
59         struct rte_ether_addr addr;
60         uint64_t pfid;
61 };
62
63 static int
64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
65 {
66         int i;
67         char *end;
68         unsigned long o[RTE_ETHER_ADDR_LEN];
69
70         i = 0;
71         do {
72                 errno = 0;
73                 o[i] = strtoul(a, &end, 16);
74                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75                         return -1;
76                 a = end + 1;
77         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78
79         /* Junk at the end of line */
80         if (end[0] != 0)
81                 return -1;
82
83         /* Support the format XX:XX:XX:XX:XX:XX */
84         if (i == RTE_ETHER_ADDR_LEN) {
85                 while (i-- != 0) {
86                         if (o[i] > UINT8_MAX)
87                                 return -1;
88                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
89                 }
90         /* Support the format XXXX:XXXX:XXXX */
91         } else if (i == RTE_ETHER_ADDR_LEN / 2) {
92                 while (i-- != 0) {
93                         if (o[i] > UINT16_MAX)
94                                 return -1;
95                         ether_addr->addr_bytes[i * 2] =
96                                         (uint8_t)(o[i] >> 8);
97                         ether_addr->addr_bytes[i * 2 + 1] =
98                                         (uint8_t)(o[i] & 0xff);
99                 }
100         /* unknown format */
101         } else
102                 return -1;
103
104         return 0;
105 }
106
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110         union PFID pfid;
111         int ret;
112
113         /* Use port MAC address as the vfid */
114         ret = str_to_ether_addr(mac, &pfid.addr);
115
116         if (ret != 0) {
117                 RTE_LOG(ERR, CHANNEL_MONITOR,
118                         "Invalid mac address received in JSON\n");
119                 pkt->vfid[idx] = 0;
120                 return -1;
121         }
122
123         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128
129         pkt->vfid[idx] = pfid.pfid;
130         return 0;
131 }
132
133 static char*
134 get_resource_name_from_chn_path(const char *channel_path)
135 {
136         char *substr = NULL;
137
138         substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
139
140         return substr;
141 }
142
143 static int
144 get_resource_id_from_vmname(const char *vm_name)
145 {
146         int result = -1;
147         int off = 0;
148
149         if (vm_name == NULL)
150                 return -1;
151
152         while (vm_name[off] != '\0') {
153                 if (isdigit(vm_name[off]))
154                         break;
155                 off++;
156         }
157         result = atoi(&vm_name[off]);
158         if ((result == 0) && (vm_name[off] != '0'))
159                 return -1;
160
161         return result;
162 }
163
164 static int
165 parse_json_to_pkt(json_t *element, struct channel_packet *pkt,
166                                         const char *vm_name)
167 {
168         const char *key;
169         json_t *value;
170         int ret;
171         int resource_id;
172
173         memset(pkt, 0, sizeof(struct channel_packet));
174
175         pkt->nb_mac_to_monitor = 0;
176         pkt->t_boost_status.tbEnabled = false;
177         pkt->workload = LOW;
178         pkt->policy_to_use = TIME;
179         pkt->command = PKT_POLICY;
180         pkt->core_type = CORE_TYPE_PHYSICAL;
181
182         if (vm_name == NULL) {
183                 RTE_LOG(ERR, CHANNEL_MONITOR,
184                         "vm_name is NULL, request rejected !\n");
185                 return -1;
186         }
187
188         json_object_foreach(element, key, value) {
189                 if (!strcmp(key, "policy")) {
190                         /* Recurse in to get the contents of profile */
191                         ret = parse_json_to_pkt(value, pkt, vm_name);
192                         if (ret)
193                                 return ret;
194                 } else if (!strcmp(key, "instruction")) {
195                         /* Recurse in to get the contents of instruction */
196                         ret = parse_json_to_pkt(value, pkt, vm_name);
197                         if (ret)
198                                 return ret;
199                 } else if (!strcmp(key, "command")) {
200                         char command[32];
201                         strlcpy(command, json_string_value(value), 32);
202                         if (!strcmp(command, "power")) {
203                                 pkt->command = CPU_POWER;
204                         } else if (!strcmp(command, "create")) {
205                                 pkt->command = PKT_POLICY;
206                         } else if (!strcmp(command, "destroy")) {
207                                 pkt->command = PKT_POLICY_REMOVE;
208                         } else {
209                                 RTE_LOG(ERR, CHANNEL_MONITOR,
210                                         "Invalid command received in JSON\n");
211                                 return -1;
212                         }
213                 } else if (!strcmp(key, "policy_type")) {
214                         char command[32];
215                         strlcpy(command, json_string_value(value), 32);
216                         if (!strcmp(command, "TIME")) {
217                                 pkt->policy_to_use = TIME;
218                         } else if (!strcmp(command, "TRAFFIC")) {
219                                 pkt->policy_to_use = TRAFFIC;
220                         } else if (!strcmp(command, "WORKLOAD")) {
221                                 pkt->policy_to_use = WORKLOAD;
222                         } else if (!strcmp(command, "BRANCH_RATIO")) {
223                                 pkt->policy_to_use = BRANCH_RATIO;
224                         } else {
225                                 RTE_LOG(ERR, CHANNEL_MONITOR,
226                                         "Wrong policy_type received in JSON\n");
227                                 return -1;
228                         }
229                 } else if (!strcmp(key, "workload")) {
230                         char command[32];
231                         strlcpy(command, json_string_value(value), 32);
232                         if (!strcmp(command, "HIGH")) {
233                                 pkt->workload = HIGH;
234                         } else if (!strcmp(command, "MEDIUM")) {
235                                 pkt->workload = MEDIUM;
236                         } else if (!strcmp(command, "LOW")) {
237                                 pkt->workload = LOW;
238                         } else {
239                                 RTE_LOG(ERR, CHANNEL_MONITOR,
240                                         "Wrong workload received in JSON\n");
241                                 return -1;
242                         }
243                 } else if (!strcmp(key, "busy_hours")) {
244                         unsigned int i;
245                         size_t size = json_array_size(value);
246
247                         for (i = 0; i < size; i++) {
248                                 int hour = (int)json_integer_value(
249                                                 json_array_get(value, i));
250                                 pkt->timer_policy.busy_hours[i] = hour;
251                         }
252                 } else if (!strcmp(key, "quiet_hours")) {
253                         unsigned int i;
254                         size_t size = json_array_size(value);
255
256                         for (i = 0; i < size; i++) {
257                                 int hour = (int)json_integer_value(
258                                                 json_array_get(value, i));
259                                 pkt->timer_policy.quiet_hours[i] = hour;
260                         }
261                 } else if (!strcmp(key, "mac_list")) {
262                         unsigned int i;
263                         size_t size = json_array_size(value);
264
265                         for (i = 0; i < size; i++) {
266                                 char mac[32];
267                                 strlcpy(mac,
268                                         json_string_value(json_array_get(value, i)),
269                                         32);
270                                 set_policy_mac(pkt, i, mac);
271                         }
272                         pkt->nb_mac_to_monitor = size;
273                 } else if (!strcmp(key, "avg_packet_thresh")) {
274                         pkt->traffic_policy.avg_max_packet_thresh =
275                                         (uint32_t)json_integer_value(value);
276                 } else if (!strcmp(key, "max_packet_thresh")) {
277                         pkt->traffic_policy.max_max_packet_thresh =
278                                         (uint32_t)json_integer_value(value);
279                 } else if (!strcmp(key, "unit")) {
280                         char unit[32];
281                         strlcpy(unit, json_string_value(value), 32);
282                         if (!strcmp(unit, "SCALE_UP")) {
283                                 pkt->unit = CPU_POWER_SCALE_UP;
284                         } else if (!strcmp(unit, "SCALE_DOWN")) {
285                                 pkt->unit = CPU_POWER_SCALE_DOWN;
286                         } else if (!strcmp(unit, "SCALE_MAX")) {
287                                 pkt->unit = CPU_POWER_SCALE_MAX;
288                         } else if (!strcmp(unit, "SCALE_MIN")) {
289                                 pkt->unit = CPU_POWER_SCALE_MIN;
290                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
291                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
292                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
293                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
294                         } else {
295                                 RTE_LOG(ERR, CHANNEL_MONITOR,
296                                         "Invalid command received in JSON\n");
297                                 return -1;
298                         }
299                 } else {
300                         RTE_LOG(ERR, CHANNEL_MONITOR,
301                                 "Unknown key received in JSON string: %s\n",
302                                 key);
303                 }
304
305                 resource_id = get_resource_id_from_vmname(vm_name);
306                 if (resource_id < 0) {
307                         RTE_LOG(ERR, CHANNEL_MONITOR,
308                                 "Could not get resource_id from vm_name:%s\n",
309                                 vm_name);
310                         return -1;
311                 }
312                 strlcpy(pkt->vm_name, vm_name, VM_MAX_NAME_SZ);
313                 pkt->resource_id = resource_id;
314         }
315         return 0;
316 }
317 #endif
318
319 void channel_monitor_exit(void)
320 {
321         run_loop = 0;
322         rte_free(global_events_list);
323 }
324
325 static void
326 core_share(int pNo, int z, int x, int t)
327 {
328         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
329                 if (strcmp(policies[pNo].pkt.vm_name,
330                                 lvm_info[x].vm_name) != 0) {
331                         policies[pNo].core_share[z].status = 1;
332                         power_manager_scale_core_max(
333                                         policies[pNo].core_share[z].pcpu);
334                 }
335         }
336 }
337
338 static void
339 core_share_status(int pNo)
340 {
341
342         int noVms = 0, noVcpus = 0, z, x, t;
343
344         get_all_vm(&noVms, &noVcpus);
345
346         /* Reset Core Share Status. */
347         for (z = 0; z < noVcpus; z++)
348                 policies[pNo].core_share[z].status = 0;
349
350         /* Foreach vcpu in a policy. */
351         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
352                 /* Foreach VM on the platform. */
353                 for (x = 0; x < noVms; x++) {
354                         /* Foreach vcpu of VMs on platform. */
355                         for (t = 0; t < lvm_info[x].num_cpus; t++)
356                                 core_share(pNo, z, x, t);
357                 }
358         }
359 }
360
361
362 static int
363 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
364 {
365         int ret = 0;
366
367         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
368                 ci->cd[pcpu].oob_enabled = 1;
369                 ret = add_core_to_monitor(pcpu);
370                 if (ret == 0)
371                         RTE_LOG(INFO, CHANNEL_MONITOR,
372                                         "Monitoring pcpu %d OOB for %s\n",
373                                         pcpu, pol->pkt.vm_name);
374                 else
375                         RTE_LOG(ERR, CHANNEL_MONITOR,
376                                         "Error monitoring pcpu %d OOB for %s\n",
377                                         pcpu, pol->pkt.vm_name);
378
379         } else {
380                 pol->core_share[count].pcpu = pcpu;
381                 RTE_LOG(INFO, CHANNEL_MONITOR,
382                                 "Monitoring pcpu %d for %s\n",
383                                 pcpu, pol->pkt.vm_name);
384         }
385         return ret;
386 }
387
388 static void
389 get_pcpu_to_control(struct policy *pol)
390 {
391
392         /* Convert vcpu to pcpu. */
393         struct vm_info info;
394         int pcpu, count;
395         struct core_info *ci;
396
397         ci = get_core_info();
398
399         RTE_LOG(DEBUG, CHANNEL_MONITOR,
400                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
401
402         /*
403          * So now that we're handling virtual and physical cores, we need to
404          * differenciate between them when adding them to the branch monitor.
405          * Virtual cores need to be converted to physical cores.
406          */
407         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
408                 /*
409                  * If the cores in the policy are virtual, we need to map them
410                  * to physical core. We look up the vm info and use that for
411                  * the mapping.
412                  */
413                 get_info_vm(pol->pkt.vm_name, &info);
414                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
415                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
416                         pcpu_monitor(pol, ci, pcpu, count);
417                 }
418         } else {
419                 /*
420                  * If the cores in the policy are physical, we just use
421                  * those core id's directly.
422                  */
423                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
424                         pcpu = pol->pkt.vcpu_to_control[count];
425                         pcpu_monitor(pol, ci, pcpu, count);
426                 }
427         }
428 }
429
430 static int
431 get_pfid(struct policy *pol)
432 {
433
434         int i, x, ret = 0;
435
436         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
437
438                 RTE_ETH_FOREACH_DEV(x) {
439                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
440                                 (struct rte_ether_addr *)&(pol->pkt.vfid[i]));
441                         if (ret != -EINVAL) {
442                                 pol->port[i] = x;
443                                 break;
444                         }
445                 }
446                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
447                         RTE_LOG(INFO, CHANNEL_MONITOR,
448                                 "Error with Policy. MAC not found on "
449                                 "attached ports ");
450                         pol->enabled = 0;
451                         return ret;
452                 }
453                 pol->pfid[i] = ret;
454         }
455         return 1;
456 }
457
458 static int
459 update_policy(struct channel_packet *pkt)
460 {
461
462         unsigned int updated = 0;
463         unsigned int i;
464
465
466         RTE_LOG(INFO, CHANNEL_MONITOR,
467                         "Applying policy for %s\n", pkt->vm_name);
468
469         for (i = 0; i < RTE_DIM(policies); i++) {
470                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
471                         /* Copy the contents of *pkt into the policy.pkt */
472                         policies[i].pkt = *pkt;
473                         get_pcpu_to_control(&policies[i]);
474                         /* Check Eth dev only for Traffic policy */
475                         if (policies[i].pkt.policy_to_use == TRAFFIC) {
476                                 if (get_pfid(&policies[i]) < 0) {
477                                         updated = 1;
478                                         break;
479                                 }
480                         }
481                         core_share_status(i);
482                         policies[i].enabled = 1;
483                         updated = 1;
484                 }
485         }
486         if (!updated) {
487                 for (i = 0; i < RTE_DIM(policies); i++) {
488                         if (policies[i].enabled == 0) {
489                                 policies[i].pkt = *pkt;
490                                 get_pcpu_to_control(&policies[i]);
491                                 /* Check Eth dev only for Traffic policy */
492                                 if (policies[i].pkt.policy_to_use == TRAFFIC) {
493                                         if (get_pfid(&policies[i]) < 0) {
494                                                 updated = 1;
495                                                 break;
496                                         }
497                                 }
498                                 core_share_status(i);
499                                 policies[i].enabled = 1;
500                                 break;
501                         }
502                 }
503         }
504         return 0;
505 }
506
507 static int
508 remove_policy(struct channel_packet *pkt __rte_unused)
509 {
510         unsigned int i;
511
512         /*
513          * Disabling the policy is simply a case of setting
514          * enabled to 0
515          */
516         for (i = 0; i < RTE_DIM(policies); i++) {
517                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
518                         policies[i].enabled = 0;
519                         return 0;
520                 }
521         }
522         return -1;
523 }
524
525 static uint64_t
526 get_pkt_diff(struct policy *pol)
527 {
528
529         uint64_t vsi_pkt_count,
530                 vsi_pkt_total = 0,
531                 vsi_pkt_count_prev_total = 0;
532         double rdtsc_curr, rdtsc_diff, diff;
533         int x;
534         struct rte_eth_stats vf_stats;
535
536         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
537
538                 /*Read vsi stats*/
539                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
540                         vsi_pkt_count = vf_stats.ipackets;
541                 else
542                         vsi_pkt_count = -1;
543
544                 vsi_pkt_total += vsi_pkt_count;
545
546                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
547                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
548         }
549
550         rdtsc_curr = rte_rdtsc_precise();
551         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
552         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
553
554         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
555                         ((double)rte_get_tsc_hz() / rdtsc_diff);
556
557         return diff;
558 }
559
560 static void
561 apply_traffic_profile(struct policy *pol)
562 {
563
564         int count;
565         uint64_t diff = 0;
566
567         diff = get_pkt_diff(pol);
568
569         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
570                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
571                         if (pol->core_share[count].status != 1)
572                                 power_manager_scale_core_max(
573                                                 pol->core_share[count].pcpu);
574                 }
575         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
576                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
577                         if (pol->core_share[count].status != 1)
578                                 power_manager_scale_core_med(
579                                                 pol->core_share[count].pcpu);
580                 }
581         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
582                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
583                         if (pol->core_share[count].status != 1)
584                                 power_manager_scale_core_min(
585                                                 pol->core_share[count].pcpu);
586                 }
587         }
588 }
589
590 static void
591 apply_time_profile(struct policy *pol)
592 {
593
594         int count, x;
595         struct timeval tv;
596         struct tm *ptm;
597         char time_string[40];
598
599         /* Obtain the time of day, and convert it to a tm struct. */
600         gettimeofday(&tv, NULL);
601         ptm = localtime(&tv.tv_sec);
602         /* Format the date and time, down to a single second. */
603         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
604
605         for (x = 0; x < HOURS; x++) {
606
607                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
608                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
609                                 if (pol->core_share[count].status != 1) {
610                                         power_manager_scale_core_max(
611                                                 pol->core_share[count].pcpu);
612                                 }
613                         }
614                         break;
615                 } else if (ptm->tm_hour ==
616                                 pol->pkt.timer_policy.quiet_hours[x]) {
617                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
618                                 if (pol->core_share[count].status != 1) {
619                                         power_manager_scale_core_min(
620                                                 pol->core_share[count].pcpu);
621                         }
622                 }
623                         break;
624                 } else if (ptm->tm_hour ==
625                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
626                         apply_traffic_profile(pol);
627                         break;
628                 }
629         }
630 }
631
632 static void
633 apply_workload_profile(struct policy *pol)
634 {
635
636         int count;
637
638         if (pol->pkt.workload == HIGH) {
639                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
640                         if (pol->core_share[count].status != 1)
641                                 power_manager_scale_core_max(
642                                                 pol->core_share[count].pcpu);
643                 }
644         } else if (pol->pkt.workload == MEDIUM) {
645                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
646                         if (pol->core_share[count].status != 1)
647                                 power_manager_scale_core_med(
648                                                 pol->core_share[count].pcpu);
649                 }
650         } else if (pol->pkt.workload == LOW) {
651                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
652                         if (pol->core_share[count].status != 1)
653                                 power_manager_scale_core_min(
654                                                 pol->core_share[count].pcpu);
655                 }
656         }
657 }
658
659 static void
660 apply_policy(struct policy *pol)
661 {
662
663         struct channel_packet *pkt = &pol->pkt;
664
665         /*Check policy to use*/
666         if (pkt->policy_to_use == TRAFFIC)
667                 apply_traffic_profile(pol);
668         else if (pkt->policy_to_use == TIME)
669                 apply_time_profile(pol);
670         else if (pkt->policy_to_use == WORKLOAD)
671                 apply_workload_profile(pol);
672 }
673
674 static int
675 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
676 {
677         int ret;
678
679         if (chan_info == NULL)
680                 return -1;
681
682         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
683                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
684                 return -1;
685
686         if (pkt->command == CPU_POWER) {
687                 unsigned int core_num;
688
689                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
690                         core_num = get_pcpu(chan_info, pkt->resource_id);
691                 else
692                         core_num = pkt->resource_id;
693
694                 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
695                         core_num);
696
697                 switch (pkt->unit) {
698                 case(CPU_POWER_SCALE_MIN):
699                         power_manager_scale_core_min(core_num);
700                         break;
701                 case(CPU_POWER_SCALE_MAX):
702                         power_manager_scale_core_max(core_num);
703                         break;
704                 case(CPU_POWER_SCALE_DOWN):
705                         power_manager_scale_core_down(core_num);
706                         break;
707                 case(CPU_POWER_SCALE_UP):
708                         power_manager_scale_core_up(core_num);
709                         break;
710                 case(CPU_POWER_ENABLE_TURBO):
711                         power_manager_enable_turbo_core(core_num);
712                         break;
713                 case(CPU_POWER_DISABLE_TURBO):
714                         power_manager_disable_turbo_core(core_num);
715                         break;
716                 default:
717                         break;
718                 }
719         }
720
721         if (pkt->command == PKT_POLICY) {
722                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
723                                 pkt->vm_name);
724                 update_policy(pkt);
725                 policy_is_set = 1;
726         }
727
728         if (pkt->command == PKT_POLICY_REMOVE) {
729                 ret = remove_policy(pkt);
730                 if (ret == 0)
731                         RTE_LOG(INFO, CHANNEL_MONITOR,
732                                  "Removed policy %s\n", pkt->vm_name);
733                 else
734                         RTE_LOG(INFO, CHANNEL_MONITOR,
735                                  "Policy %s does not exist\n", pkt->vm_name);
736         }
737
738         /*
739          * Return is not checked as channel status may have been set to DISABLED
740          * from management thread
741          */
742         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
743                         CHANNEL_MGR_CHANNEL_CONNECTED);
744         return 0;
745
746 }
747
748 int
749 add_channel_to_monitor(struct channel_info **chan_info)
750 {
751         struct channel_info *info = *chan_info;
752         struct epoll_event event;
753
754         event.events = EPOLLIN;
755         event.data.ptr = info;
756         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
757                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
758                                 "to epoll\n", info->channel_path);
759                 return -1;
760         }
761         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
762                         "to monitor\n", info->channel_path);
763         return 0;
764 }
765
766 int
767 remove_channel_from_monitor(struct channel_info *chan_info)
768 {
769         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
770                         chan_info->fd, NULL) < 0) {
771                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
772                                 "from epoll\n", chan_info->channel_path);
773                 return -1;
774         }
775         return 0;
776 }
777
778 int
779 channel_monitor_init(void)
780 {
781         global_event_fd = epoll_create1(0);
782         if (global_event_fd == 0) {
783                 RTE_LOG(ERR, CHANNEL_MONITOR,
784                                 "Error creating epoll context with error %s\n",
785                                 strerror(errno));
786                 return -1;
787         }
788         global_events_list = rte_malloc("epoll_events",
789                         sizeof(*global_events_list)
790                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
791         if (global_events_list == NULL) {
792                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
793                                 "epoll events\n");
794                 return -1;
795         }
796         return 0;
797 }
798
799 static void
800 read_binary_packet(struct channel_info *chan_info)
801 {
802         struct channel_packet pkt;
803         void *buffer = &pkt;
804         int buffer_len = sizeof(pkt);
805         int n_bytes, err = 0;
806
807         while (buffer_len > 0) {
808                 n_bytes = read(chan_info->fd,
809                                 buffer, buffer_len);
810                 if (n_bytes == buffer_len)
811                         break;
812                 if (n_bytes < 0) {
813                         err = errno;
814                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
815                                 "Received error on "
816                                 "channel '%s' read: %s\n",
817                                 chan_info->channel_path,
818                                 strerror(err));
819                         remove_channel(&chan_info);
820                         break;
821                 }
822                 buffer = (char *)buffer + n_bytes;
823                 buffer_len -= n_bytes;
824         }
825         if (!err)
826                 process_request(&pkt, chan_info);
827 }
828
829 #ifdef USE_JANSSON
830 static void
831 read_json_packet(struct channel_info *chan_info)
832 {
833         struct channel_packet pkt;
834         int n_bytes, ret;
835         json_t *root;
836         json_error_t error;
837         const char *resource_name;
838
839
840         /* read opening brace to closing brace */
841         do {
842                 int idx = 0;
843                 int indent = 0;
844                 do {
845                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
846                         if (n_bytes == 0)
847                                 break;
848                         if (json_data[idx] == '{')
849                                 indent++;
850                         if (json_data[idx] == '}')
851                                 indent--;
852                         if ((indent > 0) || (idx > 0))
853                                 idx++;
854                         if (indent <= 0)
855                                 json_data[idx] = 0;
856                         if (idx >= MAX_JSON_STRING_LEN-1)
857                                 break;
858                 } while (indent > 0);
859
860                 json_data[idx] = '\0';
861
862                 if (strlen(json_data) == 0)
863                         continue;
864
865                 printf("got [%s]\n", json_data);
866
867                 root = json_loads(json_data, 0, &error);
868
869                 if (root) {
870                         resource_name = get_resource_name_from_chn_path(
871                                 chan_info->channel_path);
872                         /*
873                          * Because our data is now in the json
874                          * object, we can overwrite the pkt
875                          * with a channel_packet struct, using
876                          * parse_json_to_pkt()
877                          */
878                         ret = parse_json_to_pkt(root, &pkt, resource_name);
879                         json_decref(root);
880                         if (ret) {
881                                 RTE_LOG(ERR, CHANNEL_MONITOR,
882                                         "Error validating JSON profile data\n");
883                                 break;
884                         }
885                         process_request(&pkt, chan_info);
886                 } else {
887                         RTE_LOG(ERR, CHANNEL_MONITOR,
888                                         "JSON error on line %d: %s\n",
889                                         error.line, error.text);
890                 }
891         } while (n_bytes > 0);
892 }
893 #endif
894
895 void
896 run_channel_monitor(void)
897 {
898         while (run_loop) {
899                 int n_events, i;
900
901                 n_events = epoll_wait(global_event_fd, global_events_list,
902                                 MAX_EVENTS, 1);
903                 if (!run_loop)
904                         break;
905                 for (i = 0; i < n_events; i++) {
906                         struct channel_info *chan_info = (struct channel_info *)
907                                         global_events_list[i].data.ptr;
908                         if ((global_events_list[i].events & EPOLLERR) ||
909                                 (global_events_list[i].events & EPOLLHUP)) {
910                                 RTE_LOG(INFO, CHANNEL_MONITOR,
911                                                 "Remote closed connection for "
912                                                 "channel '%s'\n",
913                                                 chan_info->channel_path);
914                                 remove_channel(&chan_info);
915                                 continue;
916                         }
917                         if (global_events_list[i].events & EPOLLIN) {
918
919                                 switch (chan_info->type) {
920                                 case CHANNEL_TYPE_BINARY:
921                                         read_binary_packet(chan_info);
922                                         break;
923 #ifdef USE_JANSSON
924                                 case CHANNEL_TYPE_JSON:
925                                         read_json_packet(chan_info);
926                                         break;
927 #endif
928                                 default:
929                                         break;
930                                 }
931                         }
932                 }
933                 rte_delay_us(time_period_ms*1000);
934                 if (policy_is_set) {
935                         unsigned int j;
936
937                         for (j = 0; j < RTE_DIM(policies); j++) {
938                                 if (policies[j].enabled == 1)
939                                         apply_policy(&policies[j]);
940                         }
941                 }
942         }
943 }