net: add rte prefix to ether structures
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41
42 #define MAX_EVENTS 256
43
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[MAX_CLIENTS];
55
56 #ifdef USE_JANSSON
57
58 union PFID {
59         struct rte_ether_addr addr;
60         uint64_t pfid;
61 };
62
63 static int
64 str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
65 {
66         int i;
67         char *end;
68         unsigned long o[ETHER_ADDR_LEN];
69
70         i = 0;
71         do {
72                 errno = 0;
73                 o[i] = strtoul(a, &end, 16);
74                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75                         return -1;
76                 a = end + 1;
77         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78
79         /* Junk at the end of line */
80         if (end[0] != 0)
81                 return -1;
82
83         /* Support the format XX:XX:XX:XX:XX:XX */
84         if (i == ETHER_ADDR_LEN) {
85                 while (i-- != 0) {
86                         if (o[i] > UINT8_MAX)
87                                 return -1;
88                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
89                 }
90         /* Support the format XXXX:XXXX:XXXX */
91         } else if (i == ETHER_ADDR_LEN / 2) {
92                 while (i-- != 0) {
93                         if (o[i] > UINT16_MAX)
94                                 return -1;
95                         ether_addr->addr_bytes[i * 2] =
96                                         (uint8_t)(o[i] >> 8);
97                         ether_addr->addr_bytes[i * 2 + 1] =
98                                         (uint8_t)(o[i] & 0xff);
99                 }
100         /* unknown format */
101         } else
102                 return -1;
103
104         return 0;
105 }
106
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110         union PFID pfid;
111         int ret;
112
113         /* Use port MAC address as the vfid */
114         ret = str_to_ether_addr(mac, &pfid.addr);
115
116         if (ret != 0) {
117                 RTE_LOG(ERR, CHANNEL_MONITOR,
118                         "Invalid mac address received in JSON\n");
119                 pkt->vfid[idx] = 0;
120                 return -1;
121         }
122
123         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128
129         pkt->vfid[idx] = pfid.pfid;
130         return 0;
131 }
132
133
134 static int
135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
136 {
137         const char *key;
138         json_t *value;
139         int ret;
140
141         memset(pkt, 0, sizeof(struct channel_packet));
142
143         pkt->nb_mac_to_monitor = 0;
144         pkt->t_boost_status.tbEnabled = false;
145         pkt->workload = LOW;
146         pkt->policy_to_use = TIME;
147         pkt->command = PKT_POLICY;
148         pkt->core_type = CORE_TYPE_PHYSICAL;
149
150         json_object_foreach(element, key, value) {
151                 if (!strcmp(key, "policy")) {
152                         /* Recurse in to get the contents of profile */
153                         ret = parse_json_to_pkt(value, pkt);
154                         if (ret)
155                                 return ret;
156                 } else if (!strcmp(key, "instruction")) {
157                         /* Recurse in to get the contents of instruction */
158                         ret = parse_json_to_pkt(value, pkt);
159                         if (ret)
160                                 return ret;
161                 } else if (!strcmp(key, "name")) {
162                         strlcpy(pkt->vm_name, json_string_value(value),
163                                         sizeof(pkt->vm_name));
164                 } else if (!strcmp(key, "command")) {
165                         char command[32];
166                         strlcpy(command, json_string_value(value), 32);
167                         if (!strcmp(command, "power")) {
168                                 pkt->command = CPU_POWER;
169                         } else if (!strcmp(command, "create")) {
170                                 pkt->command = PKT_POLICY;
171                         } else if (!strcmp(command, "destroy")) {
172                                 pkt->command = PKT_POLICY_REMOVE;
173                         } else {
174                                 RTE_LOG(ERR, CHANNEL_MONITOR,
175                                         "Invalid command received in JSON\n");
176                                 return -1;
177                         }
178                 } else if (!strcmp(key, "policy_type")) {
179                         char command[32];
180                         strlcpy(command, json_string_value(value), 32);
181                         if (!strcmp(command, "TIME")) {
182                                 pkt->policy_to_use = TIME;
183                         } else if (!strcmp(command, "TRAFFIC")) {
184                                 pkt->policy_to_use = TRAFFIC;
185                         } else if (!strcmp(command, "WORKLOAD")) {
186                                 pkt->policy_to_use = WORKLOAD;
187                         } else if (!strcmp(command, "BRANCH_RATIO")) {
188                                 pkt->policy_to_use = BRANCH_RATIO;
189                         } else {
190                                 RTE_LOG(ERR, CHANNEL_MONITOR,
191                                         "Wrong policy_type received in JSON\n");
192                                 return -1;
193                         }
194                 } else if (!strcmp(key, "workload")) {
195                         char command[32];
196                         strlcpy(command, json_string_value(value), 32);
197                         if (!strcmp(command, "HIGH")) {
198                                 pkt->workload = HIGH;
199                         } else if (!strcmp(command, "MEDIUM")) {
200                                 pkt->workload = MEDIUM;
201                         } else if (!strcmp(command, "LOW")) {
202                                 pkt->workload = LOW;
203                         } else {
204                                 RTE_LOG(ERR, CHANNEL_MONITOR,
205                                         "Wrong workload received in JSON\n");
206                                 return -1;
207                         }
208                 } else if (!strcmp(key, "busy_hours")) {
209                         unsigned int i;
210                         size_t size = json_array_size(value);
211
212                         for (i = 0; i < size; i++) {
213                                 int hour = (int)json_integer_value(
214                                                 json_array_get(value, i));
215                                 pkt->timer_policy.busy_hours[i] = hour;
216                         }
217                 } else if (!strcmp(key, "quiet_hours")) {
218                         unsigned int i;
219                         size_t size = json_array_size(value);
220
221                         for (i = 0; i < size; i++) {
222                                 int hour = (int)json_integer_value(
223                                                 json_array_get(value, i));
224                                 pkt->timer_policy.quiet_hours[i] = hour;
225                         }
226                 } else if (!strcmp(key, "core_list")) {
227                         unsigned int i;
228                         size_t size = json_array_size(value);
229
230                         for (i = 0; i < size; i++) {
231                                 int core = (int)json_integer_value(
232                                                 json_array_get(value, i));
233                                 pkt->vcpu_to_control[i] = core;
234                         }
235                         pkt->num_vcpu = size;
236                 } else if (!strcmp(key, "mac_list")) {
237                         unsigned int i;
238                         size_t size = json_array_size(value);
239
240                         for (i = 0; i < size; i++) {
241                                 char mac[32];
242                                 strlcpy(mac,
243                                         json_string_value(json_array_get(value, i)),
244                                         32);
245                                 set_policy_mac(pkt, i, mac);
246                         }
247                         pkt->nb_mac_to_monitor = size;
248                 } else if (!strcmp(key, "avg_packet_thresh")) {
249                         pkt->traffic_policy.avg_max_packet_thresh =
250                                         (uint32_t)json_integer_value(value);
251                 } else if (!strcmp(key, "max_packet_thresh")) {
252                         pkt->traffic_policy.max_max_packet_thresh =
253                                         (uint32_t)json_integer_value(value);
254                 } else if (!strcmp(key, "unit")) {
255                         char unit[32];
256                         strlcpy(unit, json_string_value(value), 32);
257                         if (!strcmp(unit, "SCALE_UP")) {
258                                 pkt->unit = CPU_POWER_SCALE_UP;
259                         } else if (!strcmp(unit, "SCALE_DOWN")) {
260                                 pkt->unit = CPU_POWER_SCALE_DOWN;
261                         } else if (!strcmp(unit, "SCALE_MAX")) {
262                                 pkt->unit = CPU_POWER_SCALE_MAX;
263                         } else if (!strcmp(unit, "SCALE_MIN")) {
264                                 pkt->unit = CPU_POWER_SCALE_MIN;
265                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
266                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
267                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
268                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
269                         } else {
270                                 RTE_LOG(ERR, CHANNEL_MONITOR,
271                                         "Invalid command received in JSON\n");
272                                 return -1;
273                         }
274                 } else if (!strcmp(key, "resource_id")) {
275                         pkt->resource_id = (uint32_t)json_integer_value(value);
276                 } else {
277                         RTE_LOG(ERR, CHANNEL_MONITOR,
278                                 "Unknown key received in JSON string: %s\n",
279                                 key);
280                 }
281         }
282         return 0;
283 }
284 #endif
285
286 void channel_monitor_exit(void)
287 {
288         run_loop = 0;
289         rte_free(global_events_list);
290 }
291
292 static void
293 core_share(int pNo, int z, int x, int t)
294 {
295         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
296                 if (strcmp(policies[pNo].pkt.vm_name,
297                                 lvm_info[x].vm_name) != 0) {
298                         policies[pNo].core_share[z].status = 1;
299                         power_manager_scale_core_max(
300                                         policies[pNo].core_share[z].pcpu);
301                 }
302         }
303 }
304
305 static void
306 core_share_status(int pNo)
307 {
308
309         int noVms = 0, noVcpus = 0, z, x, t;
310
311         get_all_vm(&noVms, &noVcpus);
312
313         /* Reset Core Share Status. */
314         for (z = 0; z < noVcpus; z++)
315                 policies[pNo].core_share[z].status = 0;
316
317         /* Foreach vcpu in a policy. */
318         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
319                 /* Foreach VM on the platform. */
320                 for (x = 0; x < noVms; x++) {
321                         /* Foreach vcpu of VMs on platform. */
322                         for (t = 0; t < lvm_info[x].num_cpus; t++)
323                                 core_share(pNo, z, x, t);
324                 }
325         }
326 }
327
328
329 static int
330 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
331 {
332         int ret = 0;
333
334         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
335                 ci->cd[pcpu].oob_enabled = 1;
336                 ret = add_core_to_monitor(pcpu);
337                 if (ret == 0)
338                         RTE_LOG(INFO, CHANNEL_MONITOR,
339                                         "Monitoring pcpu %d OOB for %s\n",
340                                         pcpu, pol->pkt.vm_name);
341                 else
342                         RTE_LOG(ERR, CHANNEL_MONITOR,
343                                         "Error monitoring pcpu %d OOB for %s\n",
344                                         pcpu, pol->pkt.vm_name);
345
346         } else {
347                 pol->core_share[count].pcpu = pcpu;
348                 RTE_LOG(INFO, CHANNEL_MONITOR,
349                                 "Monitoring pcpu %d for %s\n",
350                                 pcpu, pol->pkt.vm_name);
351         }
352         return ret;
353 }
354
355 static void
356 get_pcpu_to_control(struct policy *pol)
357 {
358
359         /* Convert vcpu to pcpu. */
360         struct vm_info info;
361         int pcpu, count;
362         struct core_info *ci;
363
364         ci = get_core_info();
365
366         RTE_LOG(DEBUG, CHANNEL_MONITOR,
367                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
368
369         /*
370          * So now that we're handling virtual and physical cores, we need to
371          * differenciate between them when adding them to the branch monitor.
372          * Virtual cores need to be converted to physical cores.
373          */
374         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
375                 /*
376                  * If the cores in the policy are virtual, we need to map them
377                  * to physical core. We look up the vm info and use that for
378                  * the mapping.
379                  */
380                 get_info_vm(pol->pkt.vm_name, &info);
381                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
382                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
383                         pcpu_monitor(pol, ci, pcpu, count);
384                 }
385         } else {
386                 /*
387                  * If the cores in the policy are physical, we just use
388                  * those core id's directly.
389                  */
390                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
391                         pcpu = pol->pkt.vcpu_to_control[count];
392                         pcpu_monitor(pol, ci, pcpu, count);
393                 }
394         }
395 }
396
397 static int
398 get_pfid(struct policy *pol)
399 {
400
401         int i, x, ret = 0;
402
403         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
404
405                 RTE_ETH_FOREACH_DEV(x) {
406                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
407                                 (struct rte_ether_addr *)&(pol->pkt.vfid[i]));
408                         if (ret != -EINVAL) {
409                                 pol->port[i] = x;
410                                 break;
411                         }
412                 }
413                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
414                         RTE_LOG(INFO, CHANNEL_MONITOR,
415                                 "Error with Policy. MAC not found on "
416                                 "attached ports ");
417                         pol->enabled = 0;
418                         return ret;
419                 }
420                 pol->pfid[i] = ret;
421         }
422         return 1;
423 }
424
425 static int
426 update_policy(struct channel_packet *pkt)
427 {
428
429         unsigned int updated = 0;
430         int i;
431
432
433         RTE_LOG(INFO, CHANNEL_MONITOR,
434                         "Applying policy for %s\n", pkt->vm_name);
435
436         for (i = 0; i < MAX_CLIENTS; i++) {
437                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
438                         /* Copy the contents of *pkt into the policy.pkt */
439                         policies[i].pkt = *pkt;
440                         get_pcpu_to_control(&policies[i]);
441                         /* Check Eth dev only for Traffic policy */
442                         if (policies[i].pkt.policy_to_use == TRAFFIC) {
443                                 if (get_pfid(&policies[i]) < 0) {
444                                         updated = 1;
445                                         break;
446                                 }
447                         }
448                         core_share_status(i);
449                         policies[i].enabled = 1;
450                         updated = 1;
451                 }
452         }
453         if (!updated) {
454                 for (i = 0; i < MAX_CLIENTS; i++) {
455                         if (policies[i].enabled == 0) {
456                                 policies[i].pkt = *pkt;
457                                 get_pcpu_to_control(&policies[i]);
458                                 /* Check Eth dev only for Traffic policy */
459                                 if (policies[i].pkt.policy_to_use == TRAFFIC) {
460                                         if (get_pfid(&policies[i]) < 0) {
461                                                 updated = 1;
462                                                 break;
463                                         }
464                                 }
465                                 core_share_status(i);
466                                 policies[i].enabled = 1;
467                                 break;
468                         }
469                 }
470         }
471         return 0;
472 }
473
474 static int
475 remove_policy(struct channel_packet *pkt __rte_unused)
476 {
477         int i;
478
479         /*
480          * Disabling the policy is simply a case of setting
481          * enabled to 0
482          */
483         for (i = 0; i < MAX_CLIENTS; i++) {
484                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
485                         policies[i].enabled = 0;
486                         return 0;
487                 }
488         }
489         return -1;
490 }
491
492 static uint64_t
493 get_pkt_diff(struct policy *pol)
494 {
495
496         uint64_t vsi_pkt_count,
497                 vsi_pkt_total = 0,
498                 vsi_pkt_count_prev_total = 0;
499         double rdtsc_curr, rdtsc_diff, diff;
500         int x;
501         struct rte_eth_stats vf_stats;
502
503         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
504
505                 /*Read vsi stats*/
506                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
507                         vsi_pkt_count = vf_stats.ipackets;
508                 else
509                         vsi_pkt_count = -1;
510
511                 vsi_pkt_total += vsi_pkt_count;
512
513                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
514                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
515         }
516
517         rdtsc_curr = rte_rdtsc_precise();
518         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
519         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
520
521         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
522                         ((double)rte_get_tsc_hz() / rdtsc_diff);
523
524         return diff;
525 }
526
527 static void
528 apply_traffic_profile(struct policy *pol)
529 {
530
531         int count;
532         uint64_t diff = 0;
533
534         diff = get_pkt_diff(pol);
535
536         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
537                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
538                         if (pol->core_share[count].status != 1)
539                                 power_manager_scale_core_max(
540                                                 pol->core_share[count].pcpu);
541                 }
542         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
543                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
544                         if (pol->core_share[count].status != 1)
545                                 power_manager_scale_core_med(
546                                                 pol->core_share[count].pcpu);
547                 }
548         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
549                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
550                         if (pol->core_share[count].status != 1)
551                                 power_manager_scale_core_min(
552                                                 pol->core_share[count].pcpu);
553                 }
554         }
555 }
556
557 static void
558 apply_time_profile(struct policy *pol)
559 {
560
561         int count, x;
562         struct timeval tv;
563         struct tm *ptm;
564         char time_string[40];
565
566         /* Obtain the time of day, and convert it to a tm struct. */
567         gettimeofday(&tv, NULL);
568         ptm = localtime(&tv.tv_sec);
569         /* Format the date and time, down to a single second. */
570         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
571
572         for (x = 0; x < HOURS; x++) {
573
574                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
575                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
576                                 if (pol->core_share[count].status != 1) {
577                                         power_manager_scale_core_max(
578                                                 pol->core_share[count].pcpu);
579                                 }
580                         }
581                         break;
582                 } else if (ptm->tm_hour ==
583                                 pol->pkt.timer_policy.quiet_hours[x]) {
584                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
585                                 if (pol->core_share[count].status != 1) {
586                                         power_manager_scale_core_min(
587                                                 pol->core_share[count].pcpu);
588                         }
589                 }
590                         break;
591                 } else if (ptm->tm_hour ==
592                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
593                         apply_traffic_profile(pol);
594                         break;
595                 }
596         }
597 }
598
599 static void
600 apply_workload_profile(struct policy *pol)
601 {
602
603         int count;
604
605         if (pol->pkt.workload == HIGH) {
606                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
607                         if (pol->core_share[count].status != 1)
608                                 power_manager_scale_core_max(
609                                                 pol->core_share[count].pcpu);
610                 }
611         } else if (pol->pkt.workload == MEDIUM) {
612                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
613                         if (pol->core_share[count].status != 1)
614                                 power_manager_scale_core_med(
615                                                 pol->core_share[count].pcpu);
616                 }
617         } else if (pol->pkt.workload == LOW) {
618                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
619                         if (pol->core_share[count].status != 1)
620                                 power_manager_scale_core_min(
621                                                 pol->core_share[count].pcpu);
622                 }
623         }
624 }
625
626 static void
627 apply_policy(struct policy *pol)
628 {
629
630         struct channel_packet *pkt = &pol->pkt;
631
632         /*Check policy to use*/
633         if (pkt->policy_to_use == TRAFFIC)
634                 apply_traffic_profile(pol);
635         else if (pkt->policy_to_use == TIME)
636                 apply_time_profile(pol);
637         else if (pkt->policy_to_use == WORKLOAD)
638                 apply_workload_profile(pol);
639 }
640
641 static int
642 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
643 {
644         int ret;
645
646         if (chan_info == NULL)
647                 return -1;
648
649         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
650                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
651                 return -1;
652
653         if (pkt->command == CPU_POWER) {
654                 unsigned int core_num;
655
656                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
657                         core_num = get_pcpu(chan_info, pkt->resource_id);
658                 else
659                         core_num = pkt->resource_id;
660
661                 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
662                         core_num);
663
664                 switch (pkt->unit) {
665                 case(CPU_POWER_SCALE_MIN):
666                         power_manager_scale_core_min(core_num);
667                         break;
668                 case(CPU_POWER_SCALE_MAX):
669                         power_manager_scale_core_max(core_num);
670                         break;
671                 case(CPU_POWER_SCALE_DOWN):
672                         power_manager_scale_core_down(core_num);
673                         break;
674                 case(CPU_POWER_SCALE_UP):
675                         power_manager_scale_core_up(core_num);
676                         break;
677                 case(CPU_POWER_ENABLE_TURBO):
678                         power_manager_enable_turbo_core(core_num);
679                         break;
680                 case(CPU_POWER_DISABLE_TURBO):
681                         power_manager_disable_turbo_core(core_num);
682                         break;
683                 default:
684                         break;
685                 }
686         }
687
688         if (pkt->command == PKT_POLICY) {
689                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
690                                 pkt->vm_name);
691                 update_policy(pkt);
692                 policy_is_set = 1;
693         }
694
695         if (pkt->command == PKT_POLICY_REMOVE) {
696                 ret = remove_policy(pkt);
697                 if (ret == 0)
698                         RTE_LOG(INFO, CHANNEL_MONITOR,
699                                  "Removed policy %s\n", pkt->vm_name);
700                 else
701                         RTE_LOG(INFO, CHANNEL_MONITOR,
702                                  "Policy %s does not exist\n", pkt->vm_name);
703         }
704
705         /*
706          * Return is not checked as channel status may have been set to DISABLED
707          * from management thread
708          */
709         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
710                         CHANNEL_MGR_CHANNEL_CONNECTED);
711         return 0;
712
713 }
714
715 int
716 add_channel_to_monitor(struct channel_info **chan_info)
717 {
718         struct channel_info *info = *chan_info;
719         struct epoll_event event;
720
721         event.events = EPOLLIN;
722         event.data.ptr = info;
723         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
724                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
725                                 "to epoll\n", info->channel_path);
726                 return -1;
727         }
728         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
729                         "to monitor\n", info->channel_path);
730         return 0;
731 }
732
733 int
734 remove_channel_from_monitor(struct channel_info *chan_info)
735 {
736         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
737                         chan_info->fd, NULL) < 0) {
738                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
739                                 "from epoll\n", chan_info->channel_path);
740                 return -1;
741         }
742         return 0;
743 }
744
745 int
746 channel_monitor_init(void)
747 {
748         global_event_fd = epoll_create1(0);
749         if (global_event_fd == 0) {
750                 RTE_LOG(ERR, CHANNEL_MONITOR,
751                                 "Error creating epoll context with error %s\n",
752                                 strerror(errno));
753                 return -1;
754         }
755         global_events_list = rte_malloc("epoll_events",
756                         sizeof(*global_events_list)
757                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
758         if (global_events_list == NULL) {
759                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
760                                 "epoll events\n");
761                 return -1;
762         }
763         return 0;
764 }
765
766 static void
767 read_binary_packet(struct channel_info *chan_info)
768 {
769         struct channel_packet pkt;
770         void *buffer = &pkt;
771         int buffer_len = sizeof(pkt);
772         int n_bytes, err = 0;
773
774         while (buffer_len > 0) {
775                 n_bytes = read(chan_info->fd,
776                                 buffer, buffer_len);
777                 if (n_bytes == buffer_len)
778                         break;
779                 if (n_bytes < 0) {
780                         err = errno;
781                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
782                                 "Received error on "
783                                 "channel '%s' read: %s\n",
784                                 chan_info->channel_path,
785                                 strerror(err));
786                         remove_channel(&chan_info);
787                         break;
788                 }
789                 buffer = (char *)buffer + n_bytes;
790                 buffer_len -= n_bytes;
791         }
792         if (!err)
793                 process_request(&pkt, chan_info);
794 }
795
796 #ifdef USE_JANSSON
797 static void
798 read_json_packet(struct channel_info *chan_info)
799 {
800         struct channel_packet pkt;
801         int n_bytes, ret;
802         json_t *root;
803         json_error_t error;
804
805         /* read opening brace to closing brace */
806         do {
807                 int idx = 0;
808                 int indent = 0;
809                 do {
810                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
811                         if (n_bytes == 0)
812                                 break;
813                         if (json_data[idx] == '{')
814                                 indent++;
815                         if (json_data[idx] == '}')
816                                 indent--;
817                         if ((indent > 0) || (idx > 0))
818                                 idx++;
819                         if (indent <= 0)
820                                 json_data[idx] = 0;
821                         if (idx >= MAX_JSON_STRING_LEN-1)
822                                 break;
823                 } while (indent > 0);
824
825                 json_data[idx] = '\0';
826
827                 if (strlen(json_data) == 0)
828                         continue;
829
830                 printf("got [%s]\n", json_data);
831
832                 root = json_loads(json_data, 0, &error);
833
834                 if (root) {
835                         /*
836                          * Because our data is now in the json
837                          * object, we can overwrite the pkt
838                          * with a channel_packet struct, using
839                          * parse_json_to_pkt()
840                          */
841                         ret = parse_json_to_pkt(root, &pkt);
842                         json_decref(root);
843                         if (ret) {
844                                 RTE_LOG(ERR, CHANNEL_MONITOR,
845                                         "Error validating JSON profile data\n");
846                                 break;
847                         }
848                         process_request(&pkt, chan_info);
849                 } else {
850                         RTE_LOG(ERR, CHANNEL_MONITOR,
851                                         "JSON error on line %d: %s\n",
852                                         error.line, error.text);
853                 }
854         } while (n_bytes > 0);
855 }
856 #endif
857
858 void
859 run_channel_monitor(void)
860 {
861         while (run_loop) {
862                 int n_events, i;
863
864                 n_events = epoll_wait(global_event_fd, global_events_list,
865                                 MAX_EVENTS, 1);
866                 if (!run_loop)
867                         break;
868                 for (i = 0; i < n_events; i++) {
869                         struct channel_info *chan_info = (struct channel_info *)
870                                         global_events_list[i].data.ptr;
871                         if ((global_events_list[i].events & EPOLLERR) ||
872                                 (global_events_list[i].events & EPOLLHUP)) {
873                                 RTE_LOG(INFO, CHANNEL_MONITOR,
874                                                 "Remote closed connection for "
875                                                 "channel '%s'\n",
876                                                 chan_info->channel_path);
877                                 remove_channel(&chan_info);
878                                 continue;
879                         }
880                         if (global_events_list[i].events & EPOLLIN) {
881
882                                 switch (chan_info->type) {
883                                 case CHANNEL_TYPE_BINARY:
884                                         read_binary_packet(chan_info);
885                                         break;
886 #ifdef USE_JANSSON
887                                 case CHANNEL_TYPE_JSON:
888                                         read_json_packet(chan_info);
889                                         break;
890 #endif
891                                 default:
892                                         break;
893                                 }
894                         }
895                 }
896                 rte_delay_us(time_period_ms*1000);
897                 if (policy_is_set) {
898                         int j;
899
900                         for (j = 0; j < MAX_CLIENTS; j++) {
901                                 if (policies[j].enabled == 1)
902                                         apply_policy(&policies[j]);
903                         }
904                 }
905         }
906 }