examples/power: fix string null termination
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41
42 #define MAX_EVENTS 256
43
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[MAX_CLIENTS];
55
56 #ifdef USE_JANSSON
57
58 union PFID {
59         struct ether_addr addr;
60         uint64_t pfid;
61 };
62
63 static int
64 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
65 {
66         int i;
67         char *end;
68         unsigned long o[ETHER_ADDR_LEN];
69
70         i = 0;
71         do {
72                 errno = 0;
73                 o[i] = strtoul(a, &end, 16);
74                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75                         return -1;
76                 a = end + 1;
77         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78
79         /* Junk at the end of line */
80         if (end[0] != 0)
81                 return -1;
82
83         /* Support the format XX:XX:XX:XX:XX:XX */
84         if (i == ETHER_ADDR_LEN) {
85                 while (i-- != 0) {
86                         if (o[i] > UINT8_MAX)
87                                 return -1;
88                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
89                 }
90         /* Support the format XXXX:XXXX:XXXX */
91         } else if (i == ETHER_ADDR_LEN / 2) {
92                 while (i-- != 0) {
93                         if (o[i] > UINT16_MAX)
94                                 return -1;
95                         ether_addr->addr_bytes[i * 2] =
96                                         (uint8_t)(o[i] >> 8);
97                         ether_addr->addr_bytes[i * 2 + 1] =
98                                         (uint8_t)(o[i] & 0xff);
99                 }
100         /* unknown format */
101         } else
102                 return -1;
103
104         return 0;
105 }
106
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110         union PFID pfid;
111         int ret;
112
113         /* Use port MAC address as the vfid */
114         ret = str_to_ether_addr(mac, &pfid.addr);
115
116         if (ret != 0) {
117                 RTE_LOG(ERR, CHANNEL_MONITOR,
118                         "Invalid mac address received in JSON\n");
119                 pkt->vfid[idx] = 0;
120                 return -1;
121         }
122
123         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128
129         pkt->vfid[idx] = pfid.pfid;
130         return 0;
131 }
132
133
134 static int
135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
136 {
137         const char *key;
138         json_t *value;
139         int ret;
140
141         memset(pkt, 0, sizeof(struct channel_packet));
142
143         pkt->nb_mac_to_monitor = 0;
144         pkt->t_boost_status.tbEnabled = false;
145         pkt->workload = LOW;
146         pkt->policy_to_use = TIME;
147         pkt->command = PKT_POLICY;
148         pkt->core_type = CORE_TYPE_PHYSICAL;
149
150         json_object_foreach(element, key, value) {
151                 if (!strcmp(key, "policy")) {
152                         /* Recurse in to get the contents of profile */
153                         ret = parse_json_to_pkt(value, pkt);
154                         if (ret)
155                                 return ret;
156                 } else if (!strcmp(key, "instruction")) {
157                         /* Recurse in to get the contents of instruction */
158                         ret = parse_json_to_pkt(value, pkt);
159                         if (ret)
160                                 return ret;
161                 } else if (!strcmp(key, "name")) {
162                         strcpy(pkt->vm_name, json_string_value(value));
163                 } else if (!strcmp(key, "command")) {
164                         char command[32];
165                         strlcpy(command, json_string_value(value), 32);
166                         if (!strcmp(command, "power")) {
167                                 pkt->command = CPU_POWER;
168                         } else if (!strcmp(command, "create")) {
169                                 pkt->command = PKT_POLICY;
170                         } else if (!strcmp(command, "destroy")) {
171                                 pkt->command = PKT_POLICY_REMOVE;
172                         } else {
173                                 RTE_LOG(ERR, CHANNEL_MONITOR,
174                                         "Invalid command received in JSON\n");
175                                 return -1;
176                         }
177                 } else if (!strcmp(key, "policy_type")) {
178                         char command[32];
179                         strlcpy(command, json_string_value(value), 32);
180                         if (!strcmp(command, "TIME")) {
181                                 pkt->policy_to_use = TIME;
182                         } else if (!strcmp(command, "TRAFFIC")) {
183                                 pkt->policy_to_use = TRAFFIC;
184                         } else if (!strcmp(command, "WORKLOAD")) {
185                                 pkt->policy_to_use = WORKLOAD;
186                         } else if (!strcmp(command, "BRANCH_RATIO")) {
187                                 pkt->policy_to_use = BRANCH_RATIO;
188                         } else {
189                                 RTE_LOG(ERR, CHANNEL_MONITOR,
190                                         "Wrong policy_type received in JSON\n");
191                                 return -1;
192                         }
193                 } else if (!strcmp(key, "workload")) {
194                         char command[32];
195                         strlcpy(command, json_string_value(value), 32);
196                         if (!strcmp(command, "HIGH")) {
197                                 pkt->workload = HIGH;
198                         } else if (!strcmp(command, "MEDIUM")) {
199                                 pkt->workload = MEDIUM;
200                         } else if (!strcmp(command, "LOW")) {
201                                 pkt->workload = LOW;
202                         } else {
203                                 RTE_LOG(ERR, CHANNEL_MONITOR,
204                                         "Wrong workload received in JSON\n");
205                                 return -1;
206                         }
207                 } else if (!strcmp(key, "busy_hours")) {
208                         unsigned int i;
209                         size_t size = json_array_size(value);
210
211                         for (i = 0; i < size; i++) {
212                                 int hour = (int)json_integer_value(
213                                                 json_array_get(value, i));
214                                 pkt->timer_policy.busy_hours[i] = hour;
215                         }
216                 } else if (!strcmp(key, "quiet_hours")) {
217                         unsigned int i;
218                         size_t size = json_array_size(value);
219
220                         for (i = 0; i < size; i++) {
221                                 int hour = (int)json_integer_value(
222                                                 json_array_get(value, i));
223                                 pkt->timer_policy.quiet_hours[i] = hour;
224                         }
225                 } else if (!strcmp(key, "core_list")) {
226                         unsigned int i;
227                         size_t size = json_array_size(value);
228
229                         for (i = 0; i < size; i++) {
230                                 int core = (int)json_integer_value(
231                                                 json_array_get(value, i));
232                                 pkt->vcpu_to_control[i] = core;
233                         }
234                         pkt->num_vcpu = size;
235                 } else if (!strcmp(key, "mac_list")) {
236                         unsigned int i;
237                         size_t size = json_array_size(value);
238
239                         for (i = 0; i < size; i++) {
240                                 char mac[32];
241                                 strlcpy(mac,
242                                         json_string_value(json_array_get(value, i)),
243                                         32);
244                                 set_policy_mac(pkt, i, mac);
245                         }
246                         pkt->nb_mac_to_monitor = size;
247                 } else if (!strcmp(key, "avg_packet_thresh")) {
248                         pkt->traffic_policy.avg_max_packet_thresh =
249                                         (uint32_t)json_integer_value(value);
250                 } else if (!strcmp(key, "max_packet_thresh")) {
251                         pkt->traffic_policy.max_max_packet_thresh =
252                                         (uint32_t)json_integer_value(value);
253                 } else if (!strcmp(key, "unit")) {
254                         char unit[32];
255                         strlcpy(unit, json_string_value(value), 32);
256                         if (!strcmp(unit, "SCALE_UP")) {
257                                 pkt->unit = CPU_POWER_SCALE_UP;
258                         } else if (!strcmp(unit, "SCALE_DOWN")) {
259                                 pkt->unit = CPU_POWER_SCALE_DOWN;
260                         } else if (!strcmp(unit, "SCALE_MAX")) {
261                                 pkt->unit = CPU_POWER_SCALE_MAX;
262                         } else if (!strcmp(unit, "SCALE_MIN")) {
263                                 pkt->unit = CPU_POWER_SCALE_MIN;
264                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
265                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
266                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
267                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
268                         } else {
269                                 RTE_LOG(ERR, CHANNEL_MONITOR,
270                                         "Invalid command received in JSON\n");
271                                 return -1;
272                         }
273                 } else if (!strcmp(key, "resource_id")) {
274                         pkt->resource_id = (uint32_t)json_integer_value(value);
275                 } else {
276                         RTE_LOG(ERR, CHANNEL_MONITOR,
277                                 "Unknown key received in JSON string: %s\n",
278                                 key);
279                 }
280         }
281         return 0;
282 }
283 #endif
284
285 void channel_monitor_exit(void)
286 {
287         run_loop = 0;
288         rte_free(global_events_list);
289 }
290
291 static void
292 core_share(int pNo, int z, int x, int t)
293 {
294         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
295                 if (strcmp(policies[pNo].pkt.vm_name,
296                                 lvm_info[x].vm_name) != 0) {
297                         policies[pNo].core_share[z].status = 1;
298                         power_manager_scale_core_max(
299                                         policies[pNo].core_share[z].pcpu);
300                 }
301         }
302 }
303
304 static void
305 core_share_status(int pNo)
306 {
307
308         int noVms = 0, noVcpus = 0, z, x, t;
309
310         get_all_vm(&noVms, &noVcpus);
311
312         /* Reset Core Share Status. */
313         for (z = 0; z < noVcpus; z++)
314                 policies[pNo].core_share[z].status = 0;
315
316         /* Foreach vcpu in a policy. */
317         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
318                 /* Foreach VM on the platform. */
319                 for (x = 0; x < noVms; x++) {
320                         /* Foreach vcpu of VMs on platform. */
321                         for (t = 0; t < lvm_info[x].num_cpus; t++)
322                                 core_share(pNo, z, x, t);
323                 }
324         }
325 }
326
327
328 static int
329 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
330 {
331         int ret = 0;
332
333         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
334                 ci->cd[pcpu].oob_enabled = 1;
335                 ret = add_core_to_monitor(pcpu);
336                 if (ret == 0)
337                         RTE_LOG(INFO, CHANNEL_MONITOR,
338                                         "Monitoring pcpu %d OOB for %s\n",
339                                         pcpu, pol->pkt.vm_name);
340                 else
341                         RTE_LOG(ERR, CHANNEL_MONITOR,
342                                         "Error monitoring pcpu %d OOB for %s\n",
343                                         pcpu, pol->pkt.vm_name);
344
345         } else {
346                 pol->core_share[count].pcpu = pcpu;
347                 RTE_LOG(INFO, CHANNEL_MONITOR,
348                                 "Monitoring pcpu %d for %s\n",
349                                 pcpu, pol->pkt.vm_name);
350         }
351         return ret;
352 }
353
354 static void
355 get_pcpu_to_control(struct policy *pol)
356 {
357
358         /* Convert vcpu to pcpu. */
359         struct vm_info info;
360         int pcpu, count;
361         struct core_info *ci;
362
363         ci = get_core_info();
364
365         RTE_LOG(DEBUG, CHANNEL_MONITOR,
366                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
367
368         /*
369          * So now that we're handling virtual and physical cores, we need to
370          * differenciate between them when adding them to the branch monitor.
371          * Virtual cores need to be converted to physical cores.
372          */
373         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
374                 /*
375                  * If the cores in the policy are virtual, we need to map them
376                  * to physical core. We look up the vm info and use that for
377                  * the mapping.
378                  */
379                 get_info_vm(pol->pkt.vm_name, &info);
380                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
381                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
382                         pcpu_monitor(pol, ci, pcpu, count);
383                 }
384         } else {
385                 /*
386                  * If the cores in the policy are physical, we just use
387                  * those core id's directly.
388                  */
389                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
390                         pcpu = pol->pkt.vcpu_to_control[count];
391                         pcpu_monitor(pol, ci, pcpu, count);
392                 }
393         }
394 }
395
396 static int
397 get_pfid(struct policy *pol)
398 {
399
400         int i, x, ret = 0;
401
402         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
403
404                 RTE_ETH_FOREACH_DEV(x) {
405                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
406                                 (struct ether_addr *)&(pol->pkt.vfid[i]));
407                         if (ret != -EINVAL) {
408                                 pol->port[i] = x;
409                                 break;
410                         }
411                 }
412                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
413                         RTE_LOG(INFO, CHANNEL_MONITOR,
414                                 "Error with Policy. MAC not found on "
415                                 "attached ports ");
416                         pol->enabled = 0;
417                         return ret;
418                 }
419                 pol->pfid[i] = ret;
420         }
421         return 1;
422 }
423
424 static int
425 update_policy(struct channel_packet *pkt)
426 {
427
428         unsigned int updated = 0;
429         int i;
430
431
432         RTE_LOG(INFO, CHANNEL_MONITOR,
433                         "Applying policy for %s\n", pkt->vm_name);
434
435         for (i = 0; i < MAX_CLIENTS; i++) {
436                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
437                         /* Copy the contents of *pkt into the policy.pkt */
438                         policies[i].pkt = *pkt;
439                         get_pcpu_to_control(&policies[i]);
440                         /* Check Eth dev only for Traffic policy */
441                         if (policies[i].pkt.policy_to_use == TRAFFIC) {
442                                 if (get_pfid(&policies[i]) < 0) {
443                                         updated = 1;
444                                         break;
445                                 }
446                         }
447                         core_share_status(i);
448                         policies[i].enabled = 1;
449                         updated = 1;
450                 }
451         }
452         if (!updated) {
453                 for (i = 0; i < MAX_CLIENTS; i++) {
454                         if (policies[i].enabled == 0) {
455                                 policies[i].pkt = *pkt;
456                                 get_pcpu_to_control(&policies[i]);
457                                 /* Check Eth dev only for Traffic policy */
458                                 if (policies[i].pkt.policy_to_use == TRAFFIC) {
459                                         if (get_pfid(&policies[i]) < 0) {
460                                                 updated = 1;
461                                                 break;
462                                         }
463                                 }
464                                 core_share_status(i);
465                                 policies[i].enabled = 1;
466                                 break;
467                         }
468                 }
469         }
470         return 0;
471 }
472
473 static int
474 remove_policy(struct channel_packet *pkt __rte_unused)
475 {
476         int i;
477
478         /*
479          * Disabling the policy is simply a case of setting
480          * enabled to 0
481          */
482         for (i = 0; i < MAX_CLIENTS; i++) {
483                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
484                         policies[i].enabled = 0;
485                         return 0;
486                 }
487         }
488         return -1;
489 }
490
491 static uint64_t
492 get_pkt_diff(struct policy *pol)
493 {
494
495         uint64_t vsi_pkt_count,
496                 vsi_pkt_total = 0,
497                 vsi_pkt_count_prev_total = 0;
498         double rdtsc_curr, rdtsc_diff, diff;
499         int x;
500         struct rte_eth_stats vf_stats;
501
502         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
503
504                 /*Read vsi stats*/
505                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
506                         vsi_pkt_count = vf_stats.ipackets;
507                 else
508                         vsi_pkt_count = -1;
509
510                 vsi_pkt_total += vsi_pkt_count;
511
512                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
513                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
514         }
515
516         rdtsc_curr = rte_rdtsc_precise();
517         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
518         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
519
520         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
521                         ((double)rte_get_tsc_hz() / rdtsc_diff);
522
523         return diff;
524 }
525
526 static void
527 apply_traffic_profile(struct policy *pol)
528 {
529
530         int count;
531         uint64_t diff = 0;
532
533         diff = get_pkt_diff(pol);
534
535         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
536                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
537                         if (pol->core_share[count].status != 1)
538                                 power_manager_scale_core_max(
539                                                 pol->core_share[count].pcpu);
540                 }
541         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
542                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
543                         if (pol->core_share[count].status != 1)
544                                 power_manager_scale_core_med(
545                                                 pol->core_share[count].pcpu);
546                 }
547         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
548                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
549                         if (pol->core_share[count].status != 1)
550                                 power_manager_scale_core_min(
551                                                 pol->core_share[count].pcpu);
552                 }
553         }
554 }
555
556 static void
557 apply_time_profile(struct policy *pol)
558 {
559
560         int count, x;
561         struct timeval tv;
562         struct tm *ptm;
563         char time_string[40];
564
565         /* Obtain the time of day, and convert it to a tm struct. */
566         gettimeofday(&tv, NULL);
567         ptm = localtime(&tv.tv_sec);
568         /* Format the date and time, down to a single second. */
569         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
570
571         for (x = 0; x < HOURS; x++) {
572
573                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
574                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
575                                 if (pol->core_share[count].status != 1) {
576                                         power_manager_scale_core_max(
577                                                 pol->core_share[count].pcpu);
578                                 }
579                         }
580                         break;
581                 } else if (ptm->tm_hour ==
582                                 pol->pkt.timer_policy.quiet_hours[x]) {
583                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
584                                 if (pol->core_share[count].status != 1) {
585                                         power_manager_scale_core_min(
586                                                 pol->core_share[count].pcpu);
587                         }
588                 }
589                         break;
590                 } else if (ptm->tm_hour ==
591                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
592                         apply_traffic_profile(pol);
593                         break;
594                 }
595         }
596 }
597
598 static void
599 apply_workload_profile(struct policy *pol)
600 {
601
602         int count;
603
604         if (pol->pkt.workload == HIGH) {
605                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
606                         if (pol->core_share[count].status != 1)
607                                 power_manager_scale_core_max(
608                                                 pol->core_share[count].pcpu);
609                 }
610         } else if (pol->pkt.workload == MEDIUM) {
611                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
612                         if (pol->core_share[count].status != 1)
613                                 power_manager_scale_core_med(
614                                                 pol->core_share[count].pcpu);
615                 }
616         } else if (pol->pkt.workload == LOW) {
617                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
618                         if (pol->core_share[count].status != 1)
619                                 power_manager_scale_core_min(
620                                                 pol->core_share[count].pcpu);
621                 }
622         }
623 }
624
625 static void
626 apply_policy(struct policy *pol)
627 {
628
629         struct channel_packet *pkt = &pol->pkt;
630
631         /*Check policy to use*/
632         if (pkt->policy_to_use == TRAFFIC)
633                 apply_traffic_profile(pol);
634         else if (pkt->policy_to_use == TIME)
635                 apply_time_profile(pol);
636         else if (pkt->policy_to_use == WORKLOAD)
637                 apply_workload_profile(pol);
638 }
639
640 static int
641 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
642 {
643         int ret;
644
645         if (chan_info == NULL)
646                 return -1;
647
648         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
649                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
650                 return -1;
651
652         if (pkt->command == CPU_POWER) {
653                 unsigned int core_num;
654
655                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
656                         core_num = get_pcpu(chan_info, pkt->resource_id);
657                 else
658                         core_num = pkt->resource_id;
659
660                 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
661                         core_num);
662
663                 switch (pkt->unit) {
664                 case(CPU_POWER_SCALE_MIN):
665                         power_manager_scale_core_min(core_num);
666                         break;
667                 case(CPU_POWER_SCALE_MAX):
668                         power_manager_scale_core_max(core_num);
669                         break;
670                 case(CPU_POWER_SCALE_DOWN):
671                         power_manager_scale_core_down(core_num);
672                         break;
673                 case(CPU_POWER_SCALE_UP):
674                         power_manager_scale_core_up(core_num);
675                         break;
676                 case(CPU_POWER_ENABLE_TURBO):
677                         power_manager_enable_turbo_core(core_num);
678                         break;
679                 case(CPU_POWER_DISABLE_TURBO):
680                         power_manager_disable_turbo_core(core_num);
681                         break;
682                 default:
683                         break;
684                 }
685         }
686
687         if (pkt->command == PKT_POLICY) {
688                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
689                                 pkt->vm_name);
690                 update_policy(pkt);
691                 policy_is_set = 1;
692         }
693
694         if (pkt->command == PKT_POLICY_REMOVE) {
695                 ret = remove_policy(pkt);
696                 if (ret == 0)
697                         RTE_LOG(INFO, CHANNEL_MONITOR,
698                                  "Removed policy %s\n", pkt->vm_name);
699                 else
700                         RTE_LOG(INFO, CHANNEL_MONITOR,
701                                  "Policy %s does not exist\n", pkt->vm_name);
702         }
703
704         /*
705          * Return is not checked as channel status may have been set to DISABLED
706          * from management thread
707          */
708         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
709                         CHANNEL_MGR_CHANNEL_CONNECTED);
710         return 0;
711
712 }
713
714 int
715 add_channel_to_monitor(struct channel_info **chan_info)
716 {
717         struct channel_info *info = *chan_info;
718         struct epoll_event event;
719
720         event.events = EPOLLIN;
721         event.data.ptr = info;
722         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
723                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
724                                 "to epoll\n", info->channel_path);
725                 return -1;
726         }
727         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
728                         "to monitor\n", info->channel_path);
729         return 0;
730 }
731
732 int
733 remove_channel_from_monitor(struct channel_info *chan_info)
734 {
735         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
736                         chan_info->fd, NULL) < 0) {
737                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
738                                 "from epoll\n", chan_info->channel_path);
739                 return -1;
740         }
741         return 0;
742 }
743
744 int
745 channel_monitor_init(void)
746 {
747         global_event_fd = epoll_create1(0);
748         if (global_event_fd == 0) {
749                 RTE_LOG(ERR, CHANNEL_MONITOR,
750                                 "Error creating epoll context with error %s\n",
751                                 strerror(errno));
752                 return -1;
753         }
754         global_events_list = rte_malloc("epoll_events",
755                         sizeof(*global_events_list)
756                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
757         if (global_events_list == NULL) {
758                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
759                                 "epoll events\n");
760                 return -1;
761         }
762         return 0;
763 }
764
765 static void
766 read_binary_packet(struct channel_info *chan_info)
767 {
768         struct channel_packet pkt;
769         void *buffer = &pkt;
770         int buffer_len = sizeof(pkt);
771         int n_bytes, err = 0;
772
773         while (buffer_len > 0) {
774                 n_bytes = read(chan_info->fd,
775                                 buffer, buffer_len);
776                 if (n_bytes == buffer_len)
777                         break;
778                 if (n_bytes < 0) {
779                         err = errno;
780                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
781                                 "Received error on "
782                                 "channel '%s' read: %s\n",
783                                 chan_info->channel_path,
784                                 strerror(err));
785                         remove_channel(&chan_info);
786                         break;
787                 }
788                 buffer = (char *)buffer + n_bytes;
789                 buffer_len -= n_bytes;
790         }
791         if (!err)
792                 process_request(&pkt, chan_info);
793 }
794
795 #ifdef USE_JANSSON
796 static void
797 read_json_packet(struct channel_info *chan_info)
798 {
799         struct channel_packet pkt;
800         int n_bytes, ret;
801         json_t *root;
802         json_error_t error;
803
804         /* read opening brace to closing brace */
805         do {
806                 int idx = 0;
807                 int indent = 0;
808                 do {
809                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
810                         if (n_bytes == 0)
811                                 break;
812                         if (json_data[idx] == '{')
813                                 indent++;
814                         if (json_data[idx] == '}')
815                                 indent--;
816                         if ((indent > 0) || (idx > 0))
817                                 idx++;
818                         if (indent <= 0)
819                                 json_data[idx] = 0;
820                         if (idx >= MAX_JSON_STRING_LEN-1)
821                                 break;
822                 } while (indent > 0);
823
824                 if (indent > 0)
825                         /*
826                          * We've broken out of the read loop without getting
827                          * a closing brace, so throw away the data
828                          */
829                         json_data[idx] = 0;
830
831                 if (strlen(json_data) == 0)
832                         continue;
833
834                 printf("got [%s]\n", json_data);
835
836                 root = json_loads(json_data, 0, &error);
837
838                 if (root) {
839                         /*
840                          * Because our data is now in the json
841                          * object, we can overwrite the pkt
842                          * with a channel_packet struct, using
843                          * parse_json_to_pkt()
844                          */
845                         ret = parse_json_to_pkt(root, &pkt);
846                         json_decref(root);
847                         if (ret) {
848                                 RTE_LOG(ERR, CHANNEL_MONITOR,
849                                         "Error validating JSON profile data\n");
850                                 break;
851                         }
852                         process_request(&pkt, chan_info);
853                 } else {
854                         RTE_LOG(ERR, CHANNEL_MONITOR,
855                                         "JSON error on line %d: %s\n",
856                                         error.line, error.text);
857                 }
858         } while (n_bytes > 0);
859 }
860 #endif
861
862 void
863 run_channel_monitor(void)
864 {
865         while (run_loop) {
866                 int n_events, i;
867
868                 n_events = epoll_wait(global_event_fd, global_events_list,
869                                 MAX_EVENTS, 1);
870                 if (!run_loop)
871                         break;
872                 for (i = 0; i < n_events; i++) {
873                         struct channel_info *chan_info = (struct channel_info *)
874                                         global_events_list[i].data.ptr;
875                         if ((global_events_list[i].events & EPOLLERR) ||
876                                 (global_events_list[i].events & EPOLLHUP)) {
877                                 RTE_LOG(INFO, CHANNEL_MONITOR,
878                                                 "Remote closed connection for "
879                                                 "channel '%s'\n",
880                                                 chan_info->channel_path);
881                                 remove_channel(&chan_info);
882                                 continue;
883                         }
884                         if (global_events_list[i].events & EPOLLIN) {
885
886                                 switch (chan_info->type) {
887                                 case CHANNEL_TYPE_BINARY:
888                                         read_binary_packet(chan_info);
889                                         break;
890 #ifdef USE_JANSSON
891                                 case CHANNEL_TYPE_JSON:
892                                         read_json_packet(chan_info);
893                                         break;
894 #endif
895                                 default:
896                                         break;
897                                 }
898                         }
899                 }
900                 rte_delay_us(time_period_ms*1000);
901                 if (policy_is_set) {
902                         int j;
903
904                         for (j = 0; j < MAX_CLIENTS; j++) {
905                                 if (policies[j].enabled == 1)
906                                         apply_policy(&policies[j]);
907                         }
908                 }
909         }
910 }