1a3a0fa76816fa90464816800e55be335f73368f
[dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_log.h>
25 #include <rte_memory.h>
26 #include <rte_malloc.h>
27 #include <rte_atomic.h>
28 #include <rte_cycles.h>
29 #include <rte_ethdev.h>
30 #include <rte_pmd_i40e.h>
31
32 #include <libvirt/libvirt.h>
33 #include "channel_monitor.h"
34 #include "channel_commands.h"
35 #include "channel_manager.h"
36 #include "power_manager.h"
37 #include "oob_monitor.h"
38
39 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
40
41 #define MAX_EVENTS 256
42
43 uint64_t vsi_pkt_count_prev[384];
44 uint64_t rdtsc_prev[384];
45 #define MAX_JSON_STRING_LEN 1024
46 char json_data[MAX_JSON_STRING_LEN];
47
48 double time_period_ms = 1;
49 static volatile unsigned run_loop = 1;
50 static int global_event_fd;
51 static unsigned int policy_is_set;
52 static struct epoll_event *global_events_list;
53 static struct policy policies[MAX_CLIENTS];
54
55 #ifdef USE_JANSSON
56
57 union PFID {
58         struct ether_addr addr;
59         uint64_t pfid;
60 };
61
62 static int
63 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
64 {
65         int i;
66         char *end;
67         unsigned long o[ETHER_ADDR_LEN];
68
69         i = 0;
70         do {
71                 errno = 0;
72                 o[i] = strtoul(a, &end, 16);
73                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
74                         return -1;
75                 a = end + 1;
76         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
77
78         /* Junk at the end of line */
79         if (end[0] != 0)
80                 return -1;
81
82         /* Support the format XX:XX:XX:XX:XX:XX */
83         if (i == ETHER_ADDR_LEN) {
84                 while (i-- != 0) {
85                         if (o[i] > UINT8_MAX)
86                                 return -1;
87                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
88                 }
89         /* Support the format XXXX:XXXX:XXXX */
90         } else if (i == ETHER_ADDR_LEN / 2) {
91                 while (i-- != 0) {
92                         if (o[i] > UINT16_MAX)
93                                 return -1;
94                         ether_addr->addr_bytes[i * 2] =
95                                         (uint8_t)(o[i] >> 8);
96                         ether_addr->addr_bytes[i * 2 + 1] =
97                                         (uint8_t)(o[i] & 0xff);
98                 }
99         /* unknown format */
100         } else
101                 return -1;
102
103         return 0;
104 }
105
106 static int
107 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
108 {
109         union PFID pfid;
110         int ret;
111
112         /* Use port MAC address as the vfid */
113         ret = str_to_ether_addr(mac, &pfid.addr);
114
115         if (ret != 0) {
116                 RTE_LOG(ERR, CHANNEL_MONITOR,
117                         "Invalid mac address received in JSON\n");
118                 pkt->vfid[idx] = 0;
119                 return -1;
120         }
121
122         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
123                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
124                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
125                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
126                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
127
128         pkt->vfid[idx] = pfid.pfid;
129         return 0;
130 }
131
132
133 static int
134 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
135 {
136         const char *key;
137         json_t *value;
138         int ret;
139
140         memset(pkt, 0, sizeof(struct channel_packet));
141
142         pkt->nb_mac_to_monitor = 0;
143         pkt->t_boost_status.tbEnabled = false;
144         pkt->workload = LOW;
145         pkt->policy_to_use = TIME;
146         pkt->command = PKT_POLICY;
147         pkt->core_type = CORE_TYPE_PHYSICAL;
148
149         json_object_foreach(element, key, value) {
150                 if (!strcmp(key, "policy")) {
151                         /* Recurse in to get the contents of profile */
152                         ret = parse_json_to_pkt(value, pkt);
153                         if (ret)
154                                 return ret;
155                 } else if (!strcmp(key, "instruction")) {
156                         /* Recurse in to get the contents of instruction */
157                         ret = parse_json_to_pkt(value, pkt);
158                         if (ret)
159                                 return ret;
160                 } else if (!strcmp(key, "name")) {
161                         strcpy(pkt->vm_name, json_string_value(value));
162                 } else if (!strcmp(key, "command")) {
163                         char command[32];
164                         snprintf(command, 32, "%s", json_string_value(value));
165                         if (!strcmp(command, "power")) {
166                                 pkt->command = CPU_POWER;
167                         } else if (!strcmp(command, "create")) {
168                                 pkt->command = PKT_POLICY;
169                         } else if (!strcmp(command, "destroy")) {
170                                 pkt->command = PKT_POLICY_REMOVE;
171                         } else {
172                                 RTE_LOG(ERR, CHANNEL_MONITOR,
173                                         "Invalid command received in JSON\n");
174                                 return -1;
175                         }
176                 } else if (!strcmp(key, "policy_type")) {
177                         char command[32];
178                         snprintf(command, 32, "%s", json_string_value(value));
179                         if (!strcmp(command, "TIME")) {
180                                 pkt->policy_to_use = TIME;
181                         } else if (!strcmp(command, "TRAFFIC")) {
182                                 pkt->policy_to_use = TRAFFIC;
183                         } else if (!strcmp(command, "WORKLOAD")) {
184                                 pkt->policy_to_use = WORKLOAD;
185                         } else if (!strcmp(command, "BRANCH_RATIO")) {
186                                 pkt->policy_to_use = BRANCH_RATIO;
187                         } else {
188                                 RTE_LOG(ERR, CHANNEL_MONITOR,
189                                         "Wrong policy_type received in JSON\n");
190                                 return -1;
191                         }
192                 } else if (!strcmp(key, "workload")) {
193                         char command[32];
194                         snprintf(command, 32, "%s", json_string_value(value));
195                         if (!strcmp(command, "HIGH")) {
196                                 pkt->workload = HIGH;
197                         } else if (!strcmp(command, "MEDIUM")) {
198                                 pkt->workload = MEDIUM;
199                         } else if (!strcmp(command, "LOW")) {
200                                 pkt->workload = LOW;
201                         } else {
202                                 RTE_LOG(ERR, CHANNEL_MONITOR,
203                                         "Wrong workload received in JSON\n");
204                                 return -1;
205                         }
206                 } else if (!strcmp(key, "busy_hours")) {
207                         unsigned int i;
208                         size_t size = json_array_size(value);
209
210                         for (i = 0; i < size; i++) {
211                                 int hour = (int)json_integer_value(
212                                                 json_array_get(value, i));
213                                 pkt->timer_policy.busy_hours[i] = hour;
214                         }
215                 } else if (!strcmp(key, "quiet_hours")) {
216                         unsigned int i;
217                         size_t size = json_array_size(value);
218
219                         for (i = 0; i < size; i++) {
220                                 int hour = (int)json_integer_value(
221                                                 json_array_get(value, i));
222                                 pkt->timer_policy.quiet_hours[i] = hour;
223                         }
224                 } else if (!strcmp(key, "core_list")) {
225                         unsigned int i;
226                         size_t size = json_array_size(value);
227
228                         for (i = 0; i < size; i++) {
229                                 int core = (int)json_integer_value(
230                                                 json_array_get(value, i));
231                                 pkt->vcpu_to_control[i] = core;
232                         }
233                         pkt->num_vcpu = size;
234                 } else if (!strcmp(key, "mac_list")) {
235                         unsigned int i;
236                         size_t size = json_array_size(value);
237
238                         for (i = 0; i < size; i++) {
239                                 char mac[32];
240                                 snprintf(mac, 32, "%s", json_string_value(
241                                                 json_array_get(value, i)));
242                                 set_policy_mac(pkt, i, mac);
243                         }
244                         pkt->nb_mac_to_monitor = size;
245                 } else if (!strcmp(key, "avg_packet_thresh")) {
246                         pkt->traffic_policy.avg_max_packet_thresh =
247                                         (uint32_t)json_integer_value(value);
248                 } else if (!strcmp(key, "max_packet_thresh")) {
249                         pkt->traffic_policy.max_max_packet_thresh =
250                                         (uint32_t)json_integer_value(value);
251                 } else if (!strcmp(key, "unit")) {
252                         char unit[32];
253                         snprintf(unit, 32, "%s", json_string_value(value));
254                         if (!strcmp(unit, "SCALE_UP")) {
255                                 pkt->unit = CPU_POWER_SCALE_UP;
256                         } else if (!strcmp(unit, "SCALE_DOWN")) {
257                                 pkt->unit = CPU_POWER_SCALE_DOWN;
258                         } else if (!strcmp(unit, "SCALE_MAX")) {
259                                 pkt->unit = CPU_POWER_SCALE_MAX;
260                         } else if (!strcmp(unit, "SCALE_MIN")) {
261                                 pkt->unit = CPU_POWER_SCALE_MIN;
262                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
263                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
264                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
265                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
266                         } else {
267                                 RTE_LOG(ERR, CHANNEL_MONITOR,
268                                         "Invalid command received in JSON\n");
269                                 return -1;
270                         }
271                 } else if (!strcmp(key, "resource_id")) {
272                         pkt->resource_id = (uint32_t)json_integer_value(value);
273                 } else {
274                         RTE_LOG(ERR, CHANNEL_MONITOR,
275                                 "Unknown key received in JSON string: %s\n",
276                                 key);
277                 }
278         }
279         return 0;
280 }
281 #endif
282
283 void channel_monitor_exit(void)
284 {
285         run_loop = 0;
286         rte_free(global_events_list);
287 }
288
289 static void
290 core_share(int pNo, int z, int x, int t)
291 {
292         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
293                 if (strcmp(policies[pNo].pkt.vm_name,
294                                 lvm_info[x].vm_name) != 0) {
295                         policies[pNo].core_share[z].status = 1;
296                         power_manager_scale_core_max(
297                                         policies[pNo].core_share[z].pcpu);
298                 }
299         }
300 }
301
302 static void
303 core_share_status(int pNo)
304 {
305
306         int noVms = 0, noVcpus = 0, z, x, t;
307
308         get_all_vm(&noVms, &noVcpus);
309
310         /* Reset Core Share Status. */
311         for (z = 0; z < noVcpus; z++)
312                 policies[pNo].core_share[z].status = 0;
313
314         /* Foreach vcpu in a policy. */
315         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
316                 /* Foreach VM on the platform. */
317                 for (x = 0; x < noVms; x++) {
318                         /* Foreach vcpu of VMs on platform. */
319                         for (t = 0; t < lvm_info[x].num_cpus; t++)
320                                 core_share(pNo, z, x, t);
321                 }
322         }
323 }
324
325
326 static int
327 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
328 {
329         int ret = 0;
330
331         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
332                 ci->cd[pcpu].oob_enabled = 1;
333                 ret = add_core_to_monitor(pcpu);
334                 if (ret == 0)
335                         RTE_LOG(INFO, CHANNEL_MONITOR,
336                                         "Monitoring pcpu %d OOB for %s\n",
337                                         pcpu, pol->pkt.vm_name);
338                 else
339                         RTE_LOG(ERR, CHANNEL_MONITOR,
340                                         "Error monitoring pcpu %d OOB for %s\n",
341                                         pcpu, pol->pkt.vm_name);
342
343         } else {
344                 pol->core_share[count].pcpu = pcpu;
345                 RTE_LOG(INFO, CHANNEL_MONITOR,
346                                 "Monitoring pcpu %d for %s\n",
347                                 pcpu, pol->pkt.vm_name);
348         }
349         return ret;
350 }
351
352 static void
353 get_pcpu_to_control(struct policy *pol)
354 {
355
356         /* Convert vcpu to pcpu. */
357         struct vm_info info;
358         int pcpu, count;
359         struct core_info *ci;
360
361         ci = get_core_info();
362
363         RTE_LOG(DEBUG, CHANNEL_MONITOR,
364                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
365
366         /*
367          * So now that we're handling virtual and physical cores, we need to
368          * differenciate between them when adding them to the branch monitor.
369          * Virtual cores need to be converted to physical cores.
370          */
371         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
372                 /*
373                  * If the cores in the policy are virtual, we need to map them
374                  * to physical core. We look up the vm info and use that for
375                  * the mapping.
376                  */
377                 get_info_vm(pol->pkt.vm_name, &info);
378                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
379                         pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
380                         pcpu_monitor(pol, ci, pcpu, count);
381                 }
382         } else {
383                 /*
384                  * If the cores in the policy are physical, we just use
385                  * those core id's directly.
386                  */
387                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
388                         pcpu = pol->pkt.vcpu_to_control[count];
389                         pcpu_monitor(pol, ci, pcpu, count);
390                 }
391         }
392 }
393
394 static int
395 get_pfid(struct policy *pol)
396 {
397
398         int i, x, ret = 0;
399
400         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
401
402                 RTE_ETH_FOREACH_DEV(x) {
403                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
404                                 (struct ether_addr *)&(pol->pkt.vfid[i]));
405                         if (ret != -EINVAL) {
406                                 pol->port[i] = x;
407                                 break;
408                         }
409                 }
410                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
411                         RTE_LOG(INFO, CHANNEL_MONITOR,
412                                 "Error with Policy. MAC not found on "
413                                 "attached ports ");
414                         pol->enabled = 0;
415                         return ret;
416                 }
417                 pol->pfid[i] = ret;
418         }
419         return 1;
420 }
421
422 static int
423 update_policy(struct channel_packet *pkt)
424 {
425
426         unsigned int updated = 0;
427         int i;
428
429
430         RTE_LOG(INFO, CHANNEL_MONITOR,
431                         "Applying policy for %s\n", pkt->vm_name);
432
433         for (i = 0; i < MAX_CLIENTS; i++) {
434                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
435                         /* Copy the contents of *pkt into the policy.pkt */
436                         policies[i].pkt = *pkt;
437                         get_pcpu_to_control(&policies[i]);
438                         if (get_pfid(&policies[i]) == -1) {
439                                 updated = 1;
440                                 break;
441                         }
442                         core_share_status(i);
443                         policies[i].enabled = 1;
444                         updated = 1;
445                 }
446         }
447         if (!updated) {
448                 for (i = 0; i < MAX_CLIENTS; i++) {
449                         if (policies[i].enabled == 0) {
450                                 policies[i].pkt = *pkt;
451                                 get_pcpu_to_control(&policies[i]);
452                                 if (get_pfid(&policies[i]) == -1)
453                                         break;
454                                 core_share_status(i);
455                                 policies[i].enabled = 1;
456                                 break;
457                         }
458                 }
459         }
460         return 0;
461 }
462
463 static int
464 remove_policy(struct channel_packet *pkt __rte_unused)
465 {
466         int i;
467
468         /*
469          * Disabling the policy is simply a case of setting
470          * enabled to 0
471          */
472         for (i = 0; i < MAX_CLIENTS; i++) {
473                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
474                         policies[i].enabled = 0;
475                         return 0;
476                 }
477         }
478         return -1;
479 }
480
481 static uint64_t
482 get_pkt_diff(struct policy *pol)
483 {
484
485         uint64_t vsi_pkt_count,
486                 vsi_pkt_total = 0,
487                 vsi_pkt_count_prev_total = 0;
488         double rdtsc_curr, rdtsc_diff, diff;
489         int x;
490         struct rte_eth_stats vf_stats;
491
492         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
493
494                 /*Read vsi stats*/
495                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
496                         vsi_pkt_count = vf_stats.ipackets;
497                 else
498                         vsi_pkt_count = -1;
499
500                 vsi_pkt_total += vsi_pkt_count;
501
502                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
503                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
504         }
505
506         rdtsc_curr = rte_rdtsc_precise();
507         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
508         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
509
510         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
511                         ((double)rte_get_tsc_hz() / rdtsc_diff);
512
513         return diff;
514 }
515
516 static void
517 apply_traffic_profile(struct policy *pol)
518 {
519
520         int count;
521         uint64_t diff = 0;
522
523         diff = get_pkt_diff(pol);
524
525         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
526                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
527                         if (pol->core_share[count].status != 1)
528                                 power_manager_scale_core_max(
529                                                 pol->core_share[count].pcpu);
530                 }
531         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
532                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
533                         if (pol->core_share[count].status != 1)
534                                 power_manager_scale_core_med(
535                                                 pol->core_share[count].pcpu);
536                 }
537         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
538                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
539                         if (pol->core_share[count].status != 1)
540                                 power_manager_scale_core_min(
541                                                 pol->core_share[count].pcpu);
542                 }
543         }
544 }
545
546 static void
547 apply_time_profile(struct policy *pol)
548 {
549
550         int count, x;
551         struct timeval tv;
552         struct tm *ptm;
553         char time_string[40];
554
555         /* Obtain the time of day, and convert it to a tm struct. */
556         gettimeofday(&tv, NULL);
557         ptm = localtime(&tv.tv_sec);
558         /* Format the date and time, down to a single second. */
559         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
560
561         for (x = 0; x < HOURS; x++) {
562
563                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
564                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
565                                 if (pol->core_share[count].status != 1) {
566                                         power_manager_scale_core_max(
567                                                 pol->core_share[count].pcpu);
568                                 }
569                         }
570                         break;
571                 } else if (ptm->tm_hour ==
572                                 pol->pkt.timer_policy.quiet_hours[x]) {
573                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
574                                 if (pol->core_share[count].status != 1) {
575                                         power_manager_scale_core_min(
576                                                 pol->core_share[count].pcpu);
577                         }
578                 }
579                         break;
580                 } else if (ptm->tm_hour ==
581                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
582                         apply_traffic_profile(pol);
583                         break;
584                 }
585         }
586 }
587
588 static void
589 apply_workload_profile(struct policy *pol)
590 {
591
592         int count;
593
594         if (pol->pkt.workload == HIGH) {
595                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
596                         if (pol->core_share[count].status != 1)
597                                 power_manager_scale_core_max(
598                                                 pol->core_share[count].pcpu);
599                 }
600         } else if (pol->pkt.workload == MEDIUM) {
601                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
602                         if (pol->core_share[count].status != 1)
603                                 power_manager_scale_core_med(
604                                                 pol->core_share[count].pcpu);
605                 }
606         } else if (pol->pkt.workload == LOW) {
607                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
608                         if (pol->core_share[count].status != 1)
609                                 power_manager_scale_core_min(
610                                                 pol->core_share[count].pcpu);
611                 }
612         }
613 }
614
615 static void
616 apply_policy(struct policy *pol)
617 {
618
619         struct channel_packet *pkt = &pol->pkt;
620
621         /*Check policy to use*/
622         if (pkt->policy_to_use == TRAFFIC)
623                 apply_traffic_profile(pol);
624         else if (pkt->policy_to_use == TIME)
625                 apply_time_profile(pol);
626         else if (pkt->policy_to_use == WORKLOAD)
627                 apply_workload_profile(pol);
628 }
629
630 static int
631 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
632 {
633         if (chan_info == NULL)
634                 return -1;
635
636         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
637                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
638                 return -1;
639
640         if (pkt->command == CPU_POWER) {
641                 unsigned int core_num;
642
643                 if (pkt->core_type == CORE_TYPE_VIRTUAL)
644                         core_num = get_pcpu(chan_info, pkt->resource_id);
645                 else
646                         core_num = pkt->resource_id;
647
648                 switch (pkt->unit) {
649                 case(CPU_POWER_SCALE_MIN):
650                         power_manager_scale_core_min(core_num);
651                         break;
652                 case(CPU_POWER_SCALE_MAX):
653                         power_manager_scale_core_max(core_num);
654                         break;
655                 case(CPU_POWER_SCALE_DOWN):
656                         power_manager_scale_core_down(core_num);
657                         break;
658                 case(CPU_POWER_SCALE_UP):
659                         power_manager_scale_core_up(core_num);
660                         break;
661                 case(CPU_POWER_ENABLE_TURBO):
662                         power_manager_enable_turbo_core(core_num);
663                         break;
664                 case(CPU_POWER_DISABLE_TURBO):
665                         power_manager_disable_turbo_core(core_num);
666                         break;
667                 default:
668                         break;
669                 }
670         }
671
672         if (pkt->command == PKT_POLICY) {
673                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
674                                 pkt->vm_name);
675                 update_policy(pkt);
676                 policy_is_set = 1;
677         }
678
679         if (pkt->command == PKT_POLICY_REMOVE) {
680                 RTE_LOG(INFO, CHANNEL_MONITOR,
681                                  "Removing policy %s\n", pkt->vm_name);
682                 remove_policy(pkt);
683         }
684
685         /*
686          * Return is not checked as channel status may have been set to DISABLED
687          * from management thread
688          */
689         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
690                         CHANNEL_MGR_CHANNEL_CONNECTED);
691         return 0;
692
693 }
694
695 int
696 add_channel_to_monitor(struct channel_info **chan_info)
697 {
698         struct channel_info *info = *chan_info;
699         struct epoll_event event;
700
701         event.events = EPOLLIN;
702         event.data.ptr = info;
703         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
704                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
705                                 "to epoll\n", info->channel_path);
706                 return -1;
707         }
708         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
709                         "to monitor\n", info->channel_path);
710         return 0;
711 }
712
713 int
714 remove_channel_from_monitor(struct channel_info *chan_info)
715 {
716         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
717                         chan_info->fd, NULL) < 0) {
718                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
719                                 "from epoll\n", chan_info->channel_path);
720                 return -1;
721         }
722         return 0;
723 }
724
725 int
726 channel_monitor_init(void)
727 {
728         global_event_fd = epoll_create1(0);
729         if (global_event_fd == 0) {
730                 RTE_LOG(ERR, CHANNEL_MONITOR,
731                                 "Error creating epoll context with error %s\n",
732                                 strerror(errno));
733                 return -1;
734         }
735         global_events_list = rte_malloc("epoll_events",
736                         sizeof(*global_events_list)
737                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
738         if (global_events_list == NULL) {
739                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
740                                 "epoll events\n");
741                 return -1;
742         }
743         return 0;
744 }
745
746 static void
747 read_binary_packet(struct channel_info *chan_info)
748 {
749         struct channel_packet pkt;
750         void *buffer = &pkt;
751         int buffer_len = sizeof(pkt);
752         int n_bytes, err = 0;
753
754         while (buffer_len > 0) {
755                 n_bytes = read(chan_info->fd,
756                                 buffer, buffer_len);
757                 if (n_bytes == buffer_len)
758                         break;
759                 if (n_bytes == -1) {
760                         err = errno;
761                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
762                                 "Received error on "
763                                 "channel '%s' read: %s\n",
764                                 chan_info->channel_path,
765                                 strerror(err));
766                         remove_channel(&chan_info);
767                         break;
768                 }
769                 buffer = (char *)buffer + n_bytes;
770                 buffer_len -= n_bytes;
771         }
772         if (!err)
773                 process_request(&pkt, chan_info);
774 }
775
776 #ifdef USE_JANSSON
777 static void
778 read_json_packet(struct channel_info *chan_info)
779 {
780         struct channel_packet pkt;
781         int n_bytes, ret;
782         json_t *root;
783         json_error_t error;
784
785         /* read opening brace to closing brace */
786         do {
787                 int idx = 0;
788                 int indent = 0;
789                 do {
790                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
791                         if (n_bytes == 0)
792                                 break;
793                         if (json_data[idx] == '{')
794                                 indent++;
795                         if (json_data[idx] == '}')
796                                 indent--;
797                         if ((indent > 0) || (idx > 0))
798                                 idx++;
799                         if (indent == 0)
800                                 json_data[idx] = 0;
801                         if (idx >= MAX_JSON_STRING_LEN-1)
802                                 break;
803                 } while (indent > 0);
804
805                 if (indent > 0)
806                         /*
807                          * We've broken out of the read loop without getting
808                          * a closing brace, so throw away the data
809                          */
810                         json_data[idx] = 0;
811
812                 if (strlen(json_data) == 0)
813                         continue;
814
815                 printf("got [%s]\n", json_data);
816
817                 root = json_loads(json_data, 0, &error);
818
819                 if (root) {
820                         /*
821                          * Because our data is now in the json
822                          * object, we can overwrite the pkt
823                          * with a channel_packet struct, using
824                          * parse_json_to_pkt()
825                          */
826                         ret = parse_json_to_pkt(root, &pkt);
827                         json_decref(root);
828                         if (ret) {
829                                 RTE_LOG(ERR, CHANNEL_MONITOR,
830                                         "Error validating JSON profile data\n");
831                                 break;
832                         }
833                         process_request(&pkt, chan_info);
834                 } else {
835                         RTE_LOG(ERR, CHANNEL_MONITOR,
836                                         "JSON error on line %d: %s\n",
837                                         error.line, error.text);
838                 }
839         } while (n_bytes > 0);
840 }
841 #endif
842
843 void
844 run_channel_monitor(void)
845 {
846         while (run_loop) {
847                 int n_events, i;
848
849                 n_events = epoll_wait(global_event_fd, global_events_list,
850                                 MAX_EVENTS, 1);
851                 if (!run_loop)
852                         break;
853                 for (i = 0; i < n_events; i++) {
854                         struct channel_info *chan_info = (struct channel_info *)
855                                         global_events_list[i].data.ptr;
856                         if ((global_events_list[i].events & EPOLLERR) ||
857                                 (global_events_list[i].events & EPOLLHUP)) {
858                                 RTE_LOG(INFO, CHANNEL_MONITOR,
859                                                 "Remote closed connection for "
860                                                 "channel '%s'\n",
861                                                 chan_info->channel_path);
862                                 remove_channel(&chan_info);
863                                 continue;
864                         }
865                         if (global_events_list[i].events & EPOLLIN) {
866
867                                 switch (chan_info->type) {
868                                 case CHANNEL_TYPE_BINARY:
869                                         read_binary_packet(chan_info);
870                                         break;
871 #ifdef USE_JANSSON
872                                 case CHANNEL_TYPE_JSON:
873                                         read_json_packet(chan_info);
874                                         break;
875 #endif
876                                 default:
877                                         break;
878                                 }
879                         }
880                 }
881                 rte_delay_us(time_period_ms*1000);
882                 if (policy_is_set) {
883                         int j;
884
885                         for (j = 0; j < MAX_CLIENTS; j++) {
886                                 if (policies[j].enabled == 1)
887                                         apply_policy(&policies[j]);
888                         }
889                 }
890         }
891 }