replace snprintf with strlcpy
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_string_fns.h>
25 #include <rte_log.h>
26 #include <rte_memory.h>
27 #include <rte_malloc.h>
28 #include <rte_atomic.h>
29 #include <rte_cycles.h>
30 #include <rte_ethdev.h>
31 #include <rte_pmd_i40e.h>
32
33 #include <libvirt/libvirt.h>
34 #include "channel_monitor.h"
35 #include "channel_commands.h"
36 #include "channel_manager.h"
37 #include "power_manager.h"
38 #include "oob_monitor.h"
39
40 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
41
42 #define MAX_EVENTS 256
43
44 uint64_t vsi_pkt_count_prev[384];
45 uint64_t rdtsc_prev[384];
46 #define MAX_JSON_STRING_LEN 1024
47 char json_data[MAX_JSON_STRING_LEN];
48
49 double time_period_ms = 1;
50 static volatile unsigned run_loop = 1;
51 static int global_event_fd;
52 static unsigned int policy_is_set;
53 static struct epoll_event *global_events_list;
54 static struct policy policies[MAX_CLIENTS];
55
56 #ifdef USE_JANSSON
57
58 union PFID {
59         struct ether_addr addr;
60         uint64_t pfid;
61 };
62
63 static int
64 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
65 {
66         int i;
67         char *end;
68         unsigned long o[ETHER_ADDR_LEN];
69
70         i = 0;
71         do {
72                 errno = 0;
73                 o[i] = strtoul(a, &end, 16);
74                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
75                         return -1;
76                 a = end + 1;
77         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
78
79         /* Junk at the end of line */
80         if (end[0] != 0)
81                 return -1;
82
83         /* Support the format XX:XX:XX:XX:XX:XX */
84         if (i == ETHER_ADDR_LEN) {
85                 while (i-- != 0) {
86                         if (o[i] > UINT8_MAX)
87                                 return -1;
88                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
89                 }
90         /* Support the format XXXX:XXXX:XXXX */
91         } else if (i == ETHER_ADDR_LEN / 2) {
92                 while (i-- != 0) {
93                         if (o[i] > UINT16_MAX)
94                                 return -1;
95                         ether_addr->addr_bytes[i * 2] =
96                                         (uint8_t)(o[i] >> 8);
97                         ether_addr->addr_bytes[i * 2 + 1] =
98                                         (uint8_t)(o[i] & 0xff);
99                 }
100         /* unknown format */
101         } else
102                 return -1;
103
104         return 0;
105 }
106
107 static int
108 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
109 {
110         union PFID pfid;
111         int ret;
112
113         /* Use port MAC address as the vfid */
114         ret = str_to_ether_addr(mac, &pfid.addr);
115
116         if (ret != 0) {
117                 RTE_LOG(ERR, CHANNEL_MONITOR,
118                         "Invalid mac address received in JSON\n");
119                 pkt->vfid[idx] = 0;
120                 return -1;
121         }
122
123         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
124                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
125                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
126                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
127                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
128
129         pkt->vfid[idx] = pfid.pfid;
130         return 0;
131 }
132
133
134 static int
135 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
136 {
137         const char *key;
138         json_t *value;
139         int ret;
140
141         memset(pkt, 0, sizeof(struct channel_packet));
142
143         pkt->nb_mac_to_monitor = 0;
144         pkt->t_boost_status.tbEnabled = false;
145         pkt->workload = LOW;
146         pkt->policy_to_use = TIME;
147         pkt->command = PKT_POLICY;
148         pkt->core_type = CORE_TYPE_PHYSICAL;
149
150         json_object_foreach(element, key, value) {
151                 if (!strcmp(key, "policy")) {
152                         /* Recurse in to get the contents of profile */
153                         ret = parse_json_to_pkt(value, pkt);
154                         if (ret)
155                                 return ret;
156                 } else if (!strcmp(key, "instruction")) {
157                         /* Recurse in to get the contents of instruction */
158                         ret = parse_json_to_pkt(value, pkt);
159                         if (ret)
160                                 return ret;
161                 } else if (!strcmp(key, "name")) {
162                         strcpy(pkt->vm_name, json_string_value(value));
163                 } else if (!strcmp(key, "command")) {
164                         char command[32];
165                         strlcpy(command, json_string_value(value), 32);
166                         if (!strcmp(command, "power")) {
167                                 pkt->command = CPU_POWER;
168                         } else if (!strcmp(command, "create")) {
169                                 pkt->command = PKT_POLICY;
170                         } else if (!strcmp(command, "destroy")) {
171                                 pkt->command = PKT_POLICY_REMOVE;
172                         } else {
173                                 RTE_LOG(ERR, CHANNEL_MONITOR,
174                                         "Invalid command received in JSON\n");
175                                 return -1;
176                         }
177                 } else if (!strcmp(key, "policy_type")) {
178                         char command[32];
179                         strlcpy(command, json_string_value(value), 32);
180                         if (!strcmp(command, "TIME")) {
181                                 pkt->policy_to_use = TIME;
182                         } else if (!strcmp(command, "TRAFFIC")) {
183                                 pkt->policy_to_use = TRAFFIC;
184                         } else if (!strcmp(command, "WORKLOAD")) {
185                                 pkt->policy_to_use = WORKLOAD;
186                         } else if (!strcmp(command, "BRANCH_RATIO")) {
187                                 pkt->policy_to_use = BRANCH_RATIO;
188                         } else {
189                                 RTE_LOG(ERR, CHANNEL_MONITOR,
190                                         "Wrong policy_type received in JSON\n");
191                                 return -1;
192                         }
193                 } else if (!strcmp(key, "workload")) {
194                         char command[32];
195                         strlcpy(command, json_string_value(value), 32);
196                         if (!strcmp(command, "HIGH")) {
197                                 pkt->workload = HIGH;
198                         } else if (!strcmp(command, "MEDIUM")) {
199                                 pkt->workload = MEDIUM;
200                         } else if (!strcmp(command, "LOW")) {
201                                 pkt->workload = LOW;
202                         } else {
203                                 RTE_LOG(ERR, CHANNEL_MONITOR,
204                                         "Wrong workload received in JSON\n");
205                                 return -1;
206                         }
207                 } else if (!strcmp(key, "busy_hours")) {
208                         unsigned int i;
209                         size_t size = json_array_size(value);
210
211                         for (i = 0; i < size; i++) {
212                                 int hour = (int)json_integer_value(
213                                                 json_array_get(value, i));
214                                 pkt->timer_policy.busy_hours[i] = hour;
215                         }
216                 } else if (!strcmp(key, "quiet_hours")) {
217                         unsigned int i;
218                         size_t size = json_array_size(value);
219
220                         for (i = 0; i < size; i++) {
221                                 int hour = (int)json_integer_value(
222                                                 json_array_get(value, i));
223                                 pkt->timer_policy.quiet_hours[i] = hour;
224                         }
225                 } else if (!strcmp(key, "core_list")) {
226                         unsigned int i;
227                         size_t size = json_array_size(value);
228
229                         for (i = 0; i < size; i++) {
230                                 int core = (int)json_integer_value(
231                                                 json_array_get(value, i));
232                                 pkt->vcpu_to_control[i] = core;
233                         }
234                         pkt->num_vcpu = size;
235                 } else if (!strcmp(key, "mac_list")) {
236                         unsigned int i;
237                         size_t size = json_array_size(value);
238
239                         for (i = 0; i < size; i++) {
240                                 char mac[32];
241                                 strlcpy(mac,
242                                         json_string_value(json_array_get(value, i)),
243                                         32);
244                                 set_policy_mac(pkt, i, mac);
245                         }
246                         pkt->nb_mac_to_monitor = size;
247                 } else if (!strcmp(key, "avg_packet_thresh")) {
248                         pkt->traffic_policy.avg_max_packet_thresh =
249                                         (uint32_t)json_integer_value(value);
250                 } else if (!strcmp(key, "max_packet_thresh")) {
251                         pkt->traffic_policy.max_max_packet_thresh =
252                                         (uint32_t)json_integer_value(value);
253                 } else if (!strcmp(key, "unit")) {
254                         char unit[32];
255                         strlcpy(unit, json_string_value(value), 32);
256                         if (!strcmp(unit, "SCALE_UP")) {
257                                 pkt->unit = CPU_POWER_SCALE_UP;
258                         } else if (!strcmp(unit, "SCALE_DOWN")) {
259                                 pkt->unit = CPU_POWER_SCALE_DOWN;
260                         } else if (!strcmp(unit, "SCALE_MAX")) {
261                                 pkt->unit = CPU_POWER_SCALE_MAX;
262                         } else if (!strcmp(unit, "SCALE_MIN")) {
263                                 pkt->unit = CPU_POWER_SCALE_MIN;
264                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
265                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
266                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
267                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
268                         } else {
269                                 RTE_LOG(ERR, CHANNEL_MONITOR,
270                                         "Invalid command received in JSON\n");
271                                 return -1;
272                         }
273                 } else if (!strcmp(key, "resource_id")) {
274                         pkt->resource_id = (uint32_t)json_integer_value(value);
275                 } else {
276                         RTE_LOG(ERR, CHANNEL_MONITOR,
277                                 "Unknown key received in JSON string: %s\n",
278                                 key);
279                 }
280         }
281         return 0;
282 }
283 #endif
284
285 void channel_monitor_exit(void)
286 {
287         run_loop = 0;
288         rte_free(global_events_list);
289 }
290
291 static void
292 core_share(int pNo, int z, int x, int t)
293 {
294         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
295                 if (strcmp(policies[pNo].pkt.vm_name,
296                                 lvm_info[x].vm_name) != 0) {
297                         policies[pNo].core_share[z].status = 1;
298                         power_manager_scale_core_max(
299                                         policies[pNo].core_share[z].pcpu);
300                 }
301         }
302 }
303
304 static void
305 core_share_status(int pNo)
306 {
307
308         int noVms = 0, noVcpus = 0, z, x, t;
309
310         get_all_vm(&noVms, &noVcpus);
311
312         /* Reset Core Share Status. */
313         for (z = 0; z < noVcpus; z++)
314                 policies[pNo].core_share[z].status = 0;
315
316         /* Foreach vcpu in a policy. */
317         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
318                 /* Foreach VM on the platform. */
319                 for (x = 0; x < noVms; x++) {
320                         /* Foreach vcpu of VMs on platform. */
321                         for (t = 0; t < lvm_info[x].num_cpus; t++)
322                                 core_share(pNo, z, x, t);
323                 }
324         }
325 }
326
327
328 static int
329 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
330 {
331         int ret = 0;
332
333         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
334                 ci->cd[pcpu].oob_enabled = 1;
335                 ret = add_core_to_monitor(pcpu);
336                 if (ret == 0)
337                         RTE_LOG(INFO, CHANNEL_MONITOR,
338                                         "Monitoring pcpu %d OOB for %s\n",
339                                         pcpu, pol->pkt.vm_name);
340                 else
341                         RTE_LOG(ERR, CHANNEL_MONITOR,
342                                         "Error monitoring pcpu %d OOB for %s\n",
343                                         pcpu, pol->pkt.vm_name);
344
345         } else {
346                 pol->core_share[count].pcpu = pcpu;
347                 RTE_LOG(INFO, CHANNEL_MONITOR,
348                                 "Monitoring pcpu %d for %s\n",
349                                 pcpu, pol->pkt.vm_name);
350         }
351         return ret;
352 }
353
354 static void
355 get_pcpu_to_control(struct policy *pol)
356 {
357
358         /* Convert vcpu to pcpu. */
359         struct vm_info info;
360         int pcpu, count;
361         struct core_info *ci;
362
363         ci = get_core_info();
364
365         RTE_LOG(DEBUG, CHANNEL_MONITOR,
366                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
367
368         /*
369          * So now that we're handling virtual and physical cores, we need to
370          * differenciate between them when adding them to the branch monitor.
371          * Virtual cores need to be converted to physical cores.
372          */
373         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
374                 /*
375                  * If the cores in the policy are virtual, we need to map them
376                  * to physical core. We look up the vm info and use that for
377                  * the mapping.
378                  */
379                 get_info_vm(pol->pkt.vm_name, &info);
380                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
381                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
382                         pcpu_monitor(pol, ci, pcpu, count);
383                 }
384         } else {
385                 /*
386                  * If the cores in the policy are physical, we just use
387                  * those core id's directly.
388                  */
389                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
390                         pcpu = pol->pkt.vcpu_to_control[count];
391                         pcpu_monitor(pol, ci, pcpu, count);
392                 }
393         }
394 }
395
396 static int
397 get_pfid(struct policy *pol)
398 {
399
400         int i, x, ret = 0;
401
402         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
403
404                 RTE_ETH_FOREACH_DEV(x) {
405                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
406                                 (struct ether_addr *)&(pol->pkt.vfid[i]));
407                         if (ret != -EINVAL) {
408                                 pol->port[i] = x;
409                                 break;
410                         }
411                 }
412                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
413                         RTE_LOG(INFO, CHANNEL_MONITOR,
414                                 "Error with Policy. MAC not found on "
415                                 "attached ports ");
416                         pol->enabled = 0;
417                         return ret;
418                 }
419                 pol->pfid[i] = ret;
420         }
421         return 1;
422 }
423
424 static int
425 update_policy(struct channel_packet *pkt)
426 {
427
428         unsigned int updated = 0;
429         int i;
430
431
432         RTE_LOG(INFO, CHANNEL_MONITOR,
433                         "Applying policy for %s\n", pkt->vm_name);
434
435         for (i = 0; i < MAX_CLIENTS; i++) {
436                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
437                         /* Copy the contents of *pkt into the policy.pkt */
438                         policies[i].pkt = *pkt;
439                         get_pcpu_to_control(&policies[i]);
440                         if (get_pfid(&policies[i]) < 0) {
441                                 updated = 1;
442                                 break;
443                         }
444                         core_share_status(i);
445                         policies[i].enabled = 1;
446                         updated = 1;
447                 }
448         }
449         if (!updated) {
450                 for (i = 0; i < MAX_CLIENTS; i++) {
451                         if (policies[i].enabled == 0) {
452                                 policies[i].pkt = *pkt;
453                                 get_pcpu_to_control(&policies[i]);
454                                 if (get_pfid(&policies[i]) < 0)
455                                         break;
456                                 core_share_status(i);
457                                 policies[i].enabled = 1;
458                                 break;
459                         }
460                 }
461         }
462         return 0;
463 }
464
465 static int
466 remove_policy(struct channel_packet *pkt __rte_unused)
467 {
468         int i;
469
470         /*
471          * Disabling the policy is simply a case of setting
472          * enabled to 0
473          */
474         for (i = 0; i < MAX_CLIENTS; i++) {
475                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
476                         policies[i].enabled = 0;
477                         return 0;
478                 }
479         }
480         return -1;
481 }
482
483 static uint64_t
484 get_pkt_diff(struct policy *pol)
485 {
486
487         uint64_t vsi_pkt_count,
488                 vsi_pkt_total = 0,
489                 vsi_pkt_count_prev_total = 0;
490         double rdtsc_curr, rdtsc_diff, diff;
491         int x;
492         struct rte_eth_stats vf_stats;
493
494         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
495
496                 /*Read vsi stats*/
497                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
498                         vsi_pkt_count = vf_stats.ipackets;
499                 else
500                         vsi_pkt_count = -1;
501
502                 vsi_pkt_total += vsi_pkt_count;
503
504                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
505                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
506         }
507
508         rdtsc_curr = rte_rdtsc_precise();
509         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
510         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
511
512         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
513                         ((double)rte_get_tsc_hz() / rdtsc_diff);
514
515         return diff;
516 }
517
518 static void
519 apply_traffic_profile(struct policy *pol)
520 {
521
522         int count;
523         uint64_t diff = 0;
524
525         diff = get_pkt_diff(pol);
526
527         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
528                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
529                         if (pol->core_share[count].status != 1)
530                                 power_manager_scale_core_max(
531                                                 pol->core_share[count].pcpu);
532                 }
533         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
534                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
535                         if (pol->core_share[count].status != 1)
536                                 power_manager_scale_core_med(
537                                                 pol->core_share[count].pcpu);
538                 }
539         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
540                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
541                         if (pol->core_share[count].status != 1)
542                                 power_manager_scale_core_min(
543                                                 pol->core_share[count].pcpu);
544                 }
545         }
546 }
547
548 static void
549 apply_time_profile(struct policy *pol)
550 {
551
552         int count, x;
553         struct timeval tv;
554         struct tm *ptm;
555         char time_string[40];
556
557         /* Obtain the time of day, and convert it to a tm struct. */
558         gettimeofday(&tv, NULL);
559         ptm = localtime(&tv.tv_sec);
560         /* Format the date and time, down to a single second. */
561         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
562
563         for (x = 0; x < HOURS; x++) {
564
565                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
566                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
567                                 if (pol->core_share[count].status != 1) {
568                                         power_manager_scale_core_max(
569                                                 pol->core_share[count].pcpu);
570                                 }
571                         }
572                         break;
573                 } else if (ptm->tm_hour ==
574                                 pol->pkt.timer_policy.quiet_hours[x]) {
575                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
576                                 if (pol->core_share[count].status != 1) {
577                                         power_manager_scale_core_min(
578                                                 pol->core_share[count].pcpu);
579                         }
580                 }
581                         break;
582                 } else if (ptm->tm_hour ==
583                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
584                         apply_traffic_profile(pol);
585                         break;
586                 }
587         }
588 }
589
590 static void
591 apply_workload_profile(struct policy *pol)
592 {
593
594         int count;
595
596         if (pol->pkt.workload == HIGH) {
597                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
598                         if (pol->core_share[count].status != 1)
599                                 power_manager_scale_core_max(
600                                                 pol->core_share[count].pcpu);
601                 }
602         } else if (pol->pkt.workload == MEDIUM) {
603                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
604                         if (pol->core_share[count].status != 1)
605                                 power_manager_scale_core_med(
606                                                 pol->core_share[count].pcpu);
607                 }
608         } else if (pol->pkt.workload == LOW) {
609                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
610                         if (pol->core_share[count].status != 1)
611                                 power_manager_scale_core_min(
612                                                 pol->core_share[count].pcpu);
613                 }
614         }
615 }
616
617 static void
618 apply_policy(struct policy *pol)
619 {
620
621         struct channel_packet *pkt = &pol->pkt;
622
623         /*Check policy to use*/
624         if (pkt->policy_to_use == TRAFFIC)
625                 apply_traffic_profile(pol);
626         else if (pkt->policy_to_use == TIME)
627                 apply_time_profile(pol);
628         else if (pkt->policy_to_use == WORKLOAD)
629                 apply_workload_profile(pol);
630 }
631
632 static int
633 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
634 {
635         int ret;
636
637         if (chan_info == NULL)
638                 return -1;
639
640         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
641                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
642                 return -1;
643
644         if (pkt->command == CPU_POWER) {
645                 unsigned int core_num;
646
647                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
648                         core_num = get_pcpu(chan_info, pkt->resource_id);
649                 else
650                         core_num = pkt->resource_id;
651
652                 RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
653                         core_num);
654
655                 switch (pkt->unit) {
656                 case(CPU_POWER_SCALE_MIN):
657                         power_manager_scale_core_min(core_num);
658                         break;
659                 case(CPU_POWER_SCALE_MAX):
660                         power_manager_scale_core_max(core_num);
661                         break;
662                 case(CPU_POWER_SCALE_DOWN):
663                         power_manager_scale_core_down(core_num);
664                         break;
665                 case(CPU_POWER_SCALE_UP):
666                         power_manager_scale_core_up(core_num);
667                         break;
668                 case(CPU_POWER_ENABLE_TURBO):
669                         power_manager_enable_turbo_core(core_num);
670                         break;
671                 case(CPU_POWER_DISABLE_TURBO):
672                         power_manager_disable_turbo_core(core_num);
673                         break;
674                 default:
675                         break;
676                 }
677         }
678
679         if (pkt->command == PKT_POLICY) {
680                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
681                                 pkt->vm_name);
682                 update_policy(pkt);
683                 policy_is_set = 1;
684         }
685
686         if (pkt->command == PKT_POLICY_REMOVE) {
687                 ret = remove_policy(pkt);
688                 if (ret == 0)
689                         RTE_LOG(INFO, CHANNEL_MONITOR,
690                                  "Removed policy %s\n", pkt->vm_name);
691                 else
692                         RTE_LOG(INFO, CHANNEL_MONITOR,
693                                  "Policy %s does not exist\n", pkt->vm_name);
694         }
695
696         /*
697          * Return is not checked as channel status may have been set to DISABLED
698          * from management thread
699          */
700         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
701                         CHANNEL_MGR_CHANNEL_CONNECTED);
702         return 0;
703
704 }
705
706 int
707 add_channel_to_monitor(struct channel_info **chan_info)
708 {
709         struct channel_info *info = *chan_info;
710         struct epoll_event event;
711
712         event.events = EPOLLIN;
713         event.data.ptr = info;
714         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
715                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
716                                 "to epoll\n", info->channel_path);
717                 return -1;
718         }
719         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
720                         "to monitor\n", info->channel_path);
721         return 0;
722 }
723
724 int
725 remove_channel_from_monitor(struct channel_info *chan_info)
726 {
727         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
728                         chan_info->fd, NULL) < 0) {
729                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
730                                 "from epoll\n", chan_info->channel_path);
731                 return -1;
732         }
733         return 0;
734 }
735
736 int
737 channel_monitor_init(void)
738 {
739         global_event_fd = epoll_create1(0);
740         if (global_event_fd == 0) {
741                 RTE_LOG(ERR, CHANNEL_MONITOR,
742                                 "Error creating epoll context with error %s\n",
743                                 strerror(errno));
744                 return -1;
745         }
746         global_events_list = rte_malloc("epoll_events",
747                         sizeof(*global_events_list)
748                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
749         if (global_events_list == NULL) {
750                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
751                                 "epoll events\n");
752                 return -1;
753         }
754         return 0;
755 }
756
757 static void
758 read_binary_packet(struct channel_info *chan_info)
759 {
760         struct channel_packet pkt;
761         void *buffer = &pkt;
762         int buffer_len = sizeof(pkt);
763         int n_bytes, err = 0;
764
765         while (buffer_len > 0) {
766                 n_bytes = read(chan_info->fd,
767                                 buffer, buffer_len);
768                 if (n_bytes == buffer_len)
769                         break;
770                 if (n_bytes < 0) {
771                         err = errno;
772                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
773                                 "Received error on "
774                                 "channel '%s' read: %s\n",
775                                 chan_info->channel_path,
776                                 strerror(err));
777                         remove_channel(&chan_info);
778                         break;
779                 }
780                 buffer = (char *)buffer + n_bytes;
781                 buffer_len -= n_bytes;
782         }
783         if (!err)
784                 process_request(&pkt, chan_info);
785 }
786
787 #ifdef USE_JANSSON
788 static void
789 read_json_packet(struct channel_info *chan_info)
790 {
791         struct channel_packet pkt;
792         int n_bytes, ret;
793         json_t *root;
794         json_error_t error;
795
796         /* read opening brace to closing brace */
797         do {
798                 int idx = 0;
799                 int indent = 0;
800                 do {
801                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
802                         if (n_bytes == 0)
803                                 break;
804                         if (json_data[idx] == '{')
805                                 indent++;
806                         if (json_data[idx] == '}')
807                                 indent--;
808                         if ((indent > 0) || (idx > 0))
809                                 idx++;
810                         if (indent == 0)
811                                 json_data[idx] = 0;
812                         if (idx >= MAX_JSON_STRING_LEN-1)
813                                 break;
814                 } while (indent > 0);
815
816                 if (indent > 0)
817                         /*
818                          * We've broken out of the read loop without getting
819                          * a closing brace, so throw away the data
820                          */
821                         json_data[idx] = 0;
822
823                 if (strlen(json_data) == 0)
824                         continue;
825
826                 printf("got [%s]\n", json_data);
827
828                 root = json_loads(json_data, 0, &error);
829
830                 if (root) {
831                         /*
832                          * Because our data is now in the json
833                          * object, we can overwrite the pkt
834                          * with a channel_packet struct, using
835                          * parse_json_to_pkt()
836                          */
837                         ret = parse_json_to_pkt(root, &pkt);
838                         json_decref(root);
839                         if (ret) {
840                                 RTE_LOG(ERR, CHANNEL_MONITOR,
841                                         "Error validating JSON profile data\n");
842                                 break;
843                         }
844                         process_request(&pkt, chan_info);
845                 } else {
846                         RTE_LOG(ERR, CHANNEL_MONITOR,
847                                         "JSON error on line %d: %s\n",
848                                         error.line, error.text);
849                 }
850         } while (n_bytes > 0);
851 }
852 #endif
853
854 void
855 run_channel_monitor(void)
856 {
857         while (run_loop) {
858                 int n_events, i;
859
860                 n_events = epoll_wait(global_event_fd, global_events_list,
861                                 MAX_EVENTS, 1);
862                 if (!run_loop)
863                         break;
864                 for (i = 0; i < n_events; i++) {
865                         struct channel_info *chan_info = (struct channel_info *)
866                                         global_events_list[i].data.ptr;
867                         if ((global_events_list[i].events & EPOLLERR) ||
868                                 (global_events_list[i].events & EPOLLHUP)) {
869                                 RTE_LOG(INFO, CHANNEL_MONITOR,
870                                                 "Remote closed connection for "
871                                                 "channel '%s'\n",
872                                                 chan_info->channel_path);
873                                 remove_channel(&chan_info);
874                                 continue;
875                         }
876                         if (global_events_list[i].events & EPOLLIN) {
877
878                                 switch (chan_info->type) {
879                                 case CHANNEL_TYPE_BINARY:
880                                         read_binary_packet(chan_info);
881                                         break;
882 #ifdef USE_JANSSON
883                                 case CHANNEL_TYPE_JSON:
884                                         read_json_packet(chan_info);
885                                         break;
886 #endif
887                                 default:
888                                         break;
889                                 }
890                         }
891                 }
892                 rte_delay_us(time_period_ms*1000);
893                 if (policy_is_set) {
894                         int j;
895
896                         for (j = 0; j < MAX_CLIENTS; j++) {
897                                 if (policies[j].enabled == 1)
898                                         apply_policy(&policies[j]);
899                         }
900                 }
901         }
902 }