Merge branch 'pull/master'
[collectd.git] / src / network.c
1 /**
2  * collectd - src/network.c
3  * Copyright (C) 2005-2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27
28 #include "network.h"
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33 #if HAVE_SYS_SOCKET_H
34 # include <sys/socket.h>
35 #endif
36 #if HAVE_NETDB_H
37 # include <netdb.h>
38 #endif
39 #if HAVE_NETINET_IN_H
40 # include <netinet/in.h>
41 #endif
42 #if HAVE_ARPA_INET_H
43 # include <arpa/inet.h>
44 #endif
45 #if HAVE_POLL_H
46 # include <poll.h>
47 #endif
48
49 /* 1500 - 40 - 8  =  Ethernet packet - IPv6 header - UDP header */
50 /* #define BUFF_SIZE 1452 */
51
52 #ifndef IPV6_ADD_MEMBERSHIP
53 # ifdef IPV6_JOIN_GROUP
54 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
55 # else
56 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
57 # endif
58 #endif /* !IP_ADD_MEMBERSHIP */
59
60 #define BUFF_SIZE 1024
61
62 /*
63  * Private data types
64  */
65 typedef struct sockent
66 {
67         int                      fd;
68         struct sockaddr_storage *addr;
69         socklen_t                addrlen;
70         struct sockent          *next;
71 } sockent_t;
72
73 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
74  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
75  * +-------+-----------------------+-------------------------------+
76  * ! Ver.  !                       ! Length                        !
77  * +-------+-----------------------+-------------------------------+
78  */
79 struct part_header_s
80 {
81         uint16_t type;
82         uint16_t length;
83 };
84 typedef struct part_header_s part_header_t;
85
86 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
87  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
88  * +-------------------------------+-------------------------------+
89  * ! Type                          ! Length                        !
90  * +-------------------------------+-------------------------------+
91  * : (Length - 4) Bytes                                            :
92  * +---------------------------------------------------------------+
93  */
94 struct part_string_s
95 {
96         part_header_t *head;
97         char *value;
98 };
99 typedef struct part_string_s part_string_t;
100
101 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
102  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
103  * +-------------------------------+-------------------------------+
104  * ! Type                          ! Length                        !
105  * +-------------------------------+-------------------------------+
106  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
107  * +---------------------------------------------------------------+
108  */
109 struct part_number_s
110 {
111         part_header_t *head;
112         uint64_t *value;
113 };
114 typedef struct part_number_s part_number_t;
115
116 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
117  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
118  * +-------------------------------+-------------------------------+
119  * ! Type                          ! Length                        !
120  * +-------------------------------+---------------+---------------+
121  * ! Num of values                 ! Type0         ! Type1         !
122  * +-------------------------------+---------------+---------------+
123  * ! Value0                                                        !
124  * !                                                               !
125  * +---------------------------------------------------------------+
126  * ! Value1                                                        !
127  * !                                                               !
128  * +---------------------------------------------------------------+
129  */
130 struct part_values_s
131 {
132         part_header_t *head;
133         uint16_t *num_values;
134         uint8_t  *values_types;
135         value_t  *values;
136 };
137 typedef struct part_values_s part_values_t;
138
139 struct receive_list_entry_s
140 {
141   char data[BUFF_SIZE];
142   int  data_len;
143   struct receive_list_entry_s *next;
144 };
145 typedef struct receive_list_entry_s receive_list_entry_t;
146
147 /*
148  * Private variables
149  */
150 static const char *config_keys[] =
151 {
152         "CacheFlush",
153         "Listen",
154         "Server",
155         "TimeToLive",
156         "Forward"
157 };
158 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
159
160 static int network_config_ttl = 0;
161 static int network_config_forward = 0;
162
163 static sockent_t *sending_sockets = NULL;
164
165 static receive_list_entry_t *receive_list_head = NULL;
166 static receive_list_entry_t *receive_list_tail = NULL;
167 static pthread_mutex_t       receive_list_lock = PTHREAD_MUTEX_INITIALIZER;
168 static pthread_cond_t        receive_list_cond = PTHREAD_COND_INITIALIZER;
169
170 static struct pollfd *listen_sockets = NULL;
171 static int listen_sockets_num = 0;
172
173 static int listen_loop = 0;
174 static pthread_t receive_thread_id = 0;
175 static pthread_t dispatch_thread_id = 0;
176
177 static char         send_buffer[BUFF_SIZE];
178 static char        *send_buffer_ptr;
179 static int          send_buffer_fill;
180 static value_list_t send_buffer_vl = VALUE_LIST_STATIC;
181 static char         send_buffer_type[DATA_MAX_NAME_LEN];
182 static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
183
184 static c_avl_tree_t      *cache_tree = NULL;
185 static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
186 static time_t           cache_flush_last;
187 static int              cache_flush_interval = 1800;
188
189 /*
190  * Private functions
191  */
192 static int cache_flush (void)
193 {
194         char **keys = NULL;
195         int    keys_num = 0;
196
197         char **tmp;
198         int    i;
199
200         char   *key;
201         time_t *value;
202         c_avl_iterator_t *iter;
203
204         time_t curtime = time (NULL);
205
206         iter = c_avl_get_iterator (cache_tree);
207         while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
208         {
209                 if ((curtime - *value) <= cache_flush_interval)
210                         continue;
211                 tmp = (char **) realloc (keys,
212                                 (keys_num + 1) * sizeof (char *));
213                 if (tmp == NULL)
214                 {
215                         sfree (keys);
216                         c_avl_iterator_destroy (iter);
217                         ERROR ("network plugin: cache_flush: realloc"
218                                         " failed.");
219                         return (-1);
220                 }
221                 keys = tmp;
222                 keys[keys_num] = key;
223                 keys_num++;
224         } /* while (c_avl_iterator_next) */
225         c_avl_iterator_destroy (iter);
226
227         for (i = 0; i < keys_num; i++)
228         {
229                 if (c_avl_remove (cache_tree, keys[i], (void *) &key,
230                                         (void *) &value) != 0)
231                 {
232                         WARNING ("network plugin: cache_flush: c_avl_remove"
233                                         " (%s) failed.", keys[i]);
234                         continue;
235                 }
236
237                 sfree (key);
238                 sfree (value);
239         }
240
241         sfree (keys);
242
243         DEBUG ("network plugin: cache_flush: Removed %i %s",
244                         keys_num, (keys_num == 1) ? "entry" : "entries");
245         cache_flush_last = curtime;
246         return (0);
247 } /* int cache_flush */
248
249 static int cache_check (const char *type, const value_list_t *vl)
250 {
251         char key[1024];
252         time_t *value = NULL;
253         int retval = -1;
254
255         if (cache_tree == NULL)
256                 return (-1);
257
258         if (format_name (key, sizeof (key), vl->host, vl->plugin,
259                                 vl->plugin_instance, type, vl->type_instance))
260                 return (-1);
261
262         pthread_mutex_lock (&cache_lock);
263
264         if (c_avl_get (cache_tree, key, (void *) &value) == 0)
265         {
266                 if (*value < vl->time)
267                 {
268                         *value = vl->time;
269                         retval = 0;
270                 }
271                 else
272                 {
273                         DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i",
274                                         (int) *value, (int) vl->time);
275                         retval = 1;
276                 }
277         }
278         else
279         {
280                 char *key_copy = strdup (key);
281                 value = malloc (sizeof (time_t));
282                 if ((key_copy != NULL) && (value != NULL))
283                 {
284                         *value = vl->time;
285                         c_avl_insert (cache_tree, key_copy, value);
286                         retval = 0;
287                 }
288                 else
289                 {
290                         sfree (key_copy);
291                         sfree (value);
292                 }
293         }
294
295         if ((time (NULL) - cache_flush_last) > cache_flush_interval)
296                 cache_flush ();
297
298         pthread_mutex_unlock (&cache_lock);
299
300         DEBUG ("network plugin: cache_check: key = %s; time = %i; retval = %i",
301                         key, (int) vl->time, retval);
302
303         return (retval);
304 } /* int cache_check */
305
306 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
307                 const data_set_t *ds, const value_list_t *vl)
308 {
309         part_values_t pv;
310         int i;
311
312         i = 6 + (9 * vl->values_len);
313         if (*ret_buffer_len < i)
314                 return (-1);
315         *ret_buffer_len -= i;
316
317         pv.head = (part_header_t *) *ret_buffer;
318         pv.num_values = (uint16_t *) (pv.head + 1);
319         pv.values_types = (uint8_t *) (pv.num_values + 1);
320         pv.values = (value_t *) (pv.values_types + vl->values_len);
321         *ret_buffer = (void *) (pv.values + vl->values_len);
322
323         pv.head->type = htons (TYPE_VALUES);
324         pv.head->length = htons (6 + (9 * vl->values_len));
325         *pv.num_values = htons ((uint16_t) vl->values_len);
326         
327         for (i = 0; i < vl->values_len; i++)
328         {
329                 if (ds->ds[i].type == DS_TYPE_COUNTER)
330                 {
331                         pv.values_types[i] = DS_TYPE_COUNTER;
332                         pv.values[i].counter = htonll (vl->values[i].counter);
333                 }
334                 else
335                 {
336                         pv.values_types[i] = DS_TYPE_GAUGE;
337                         pv.values[i].gauge = vl->values[i].gauge;
338                 }
339         } /* for (values) */
340
341         return (0);
342 } /* int write_part_values */
343
344 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
345                 int type, uint64_t value)
346 {
347         part_number_t pn;
348
349         if (*ret_buffer_len < 12)
350                 return (-1);
351
352         pn.head = (part_header_t *) *ret_buffer;
353         pn.value = (uint64_t *) (pn.head + 1);
354
355         pn.head->type = htons (type);
356         pn.head->length = htons (12);
357         *pn.value = htonll (value);
358
359         *ret_buffer = (char *) (pn.value + 1);
360         *ret_buffer_len -= 12;
361
362         return (0);
363 } /* int write_part_number */
364
365 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
366                 int type, const char *str, int str_len)
367 {
368         part_string_t ps;
369         int len;
370
371         len = 4 + str_len + 1;
372         if (*ret_buffer_len < len)
373                 return (-1);
374         *ret_buffer_len -= len;
375
376         ps.head = (part_header_t *) *ret_buffer;
377         ps.value = (char *) (ps.head + 1);
378
379         ps.head->type = htons ((uint16_t) type);
380         ps.head->length = htons ((uint16_t) str_len + 5);
381         if (str_len > 0)
382                 memcpy (ps.value, str, str_len);
383         ps.value[str_len] = '\0';
384         *ret_buffer = (void *) (ps.value + (str_len + 1));
385
386         return (0);
387 } /* int write_part_string */
388
389 static int parse_part_values (void **ret_buffer, int *ret_buffer_len,
390                 value_t **ret_values, int *ret_num_values)
391 {
392         char *buffer = *ret_buffer;
393         int   buffer_len = *ret_buffer_len;
394         part_values_t pv;
395         int   i;
396
397         uint16_t h_length;
398         uint16_t h_type;
399         uint16_t h_num;
400
401         if (buffer_len < (15))
402         {
403                 DEBUG ("network plugin: packet is too short: buffer_len = %i",
404                                 buffer_len);
405                 return (-1);
406         }
407
408         pv.head = (part_header_t *) buffer;
409         h_length = ntohs (pv.head->length);
410         h_type = ntohs (pv.head->type);
411
412         assert (h_type == TYPE_VALUES);
413
414         pv.num_values = (uint16_t *) (pv.head + 1);
415         h_num = ntohs (*pv.num_values);
416
417         if (h_num != ((h_length - 6) / 9))
418         {
419                 DEBUG ("`length' and `num of values' don't match");
420                 return (-1);
421         }
422
423         pv.values_types = (uint8_t *) (pv.num_values + 1);
424         pv.values = (value_t *) (pv.values_types + h_num);
425
426         for (i = 0; i < h_num; i++)
427                 if (pv.values_types[i] == DS_TYPE_COUNTER)
428                         pv.values[i].counter = ntohll (pv.values[i].counter);
429
430         *ret_buffer     = (void *) (pv.values + h_num);
431         *ret_buffer_len = buffer_len - h_length;
432         *ret_num_values = h_num;
433         *ret_values     = pv.values;
434
435         return (0);
436 } /* int parse_part_values */
437
438 static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
439                 uint64_t *value)
440 {
441         part_number_t pn;
442         uint16_t len;
443
444         pn.head = (part_header_t *) *ret_buffer;
445         pn.value = (uint64_t *) (pn.head + 1);
446
447         len = ntohs (pn.head->length);
448         if (len != 12)
449                 return (-1);
450         if (len > *ret_buffer_len)
451                 return (-1);
452         *value = ntohll (*pn.value);
453
454         *ret_buffer = (void *) (pn.value + 1);
455         *ret_buffer_len -= len;
456
457         return (0);
458 } /* int parse_part_number */
459
460 static int parse_part_string (void **ret_buffer, int *ret_buffer_len,
461                 char *output, int output_len)
462 {
463         char *buffer = *ret_buffer;
464         int   buffer_len = *ret_buffer_len;
465         part_string_t ps;
466
467         uint16_t h_length;
468         uint16_t h_type;
469
470         DEBUG ("network plugin: parse_part_string: ret_buffer = %p;"
471                         " ret_buffer_len = %i; output = %p; output_len = %i;",
472                         *ret_buffer, *ret_buffer_len,
473                         (void *) output, output_len);
474
475         ps.head = (part_header_t *) buffer;
476
477         h_length = ntohs (ps.head->length);
478         h_type = ntohs (ps.head->type);
479
480         DEBUG ("network plugin: parse_part_string: length = %hu; type = %hu;",
481                         h_length, h_type);
482
483         if (buffer_len < h_length)
484         {
485                 DEBUG ("packet is too short");
486                 return (-1);
487         }
488         assert ((h_type == TYPE_HOST)
489                         || (h_type == TYPE_PLUGIN)
490                         || (h_type == TYPE_PLUGIN_INSTANCE)
491                         || (h_type == TYPE_TYPE)
492                         || (h_type == TYPE_TYPE_INSTANCE)
493                         || (h_type == TYPE_MESSAGE));
494
495         ps.value = buffer + 4;
496         if (ps.value[h_length - 5] != '\0')
497         {
498                 DEBUG ("String does not end with a nullbyte");
499                 return (-1);
500         }
501
502         if (output_len < (h_length - 4))
503         {
504                 DEBUG ("output buffer is too small");
505                 return (-1);
506         }
507         strcpy (output, ps.value);
508
509         DEBUG ("network plugin: parse_part_string: output = %s", output);
510
511         *ret_buffer = (void *) (buffer + h_length);
512         *ret_buffer_len = buffer_len - h_length;
513
514         return (0);
515 } /* int parse_part_string */
516
517 static int parse_packet (void *buffer, int buffer_len)
518 {
519         part_header_t *header;
520         int status;
521
522         value_list_t vl = VALUE_LIST_INIT;
523         char type[DATA_MAX_NAME_LEN];
524         notification_t n;
525
526         DEBUG ("network plugin: parse_packet: buffer = %p; buffer_len = %i;",
527                         buffer, buffer_len);
528
529         memset (&vl, '\0', sizeof (vl));
530         memset (&type, '\0', sizeof (type));
531         memset (&n, '\0', sizeof (n));
532         status = 0;
533
534         while ((status == 0) && (0 < buffer_len)
535                         && ((unsigned int) buffer_len > sizeof (part_header_t)))
536         {
537                 header = (part_header_t *) buffer;
538
539                 if (ntohs (header->length) > buffer_len)
540                         break;
541                 /* Assure that this loop terminates eventually */
542                 if (ntohs (header->length) < 4)
543                         break;
544
545                 if (ntohs (header->type) == TYPE_VALUES)
546                 {
547                         status = parse_part_values (&buffer, &buffer_len,
548                                         &vl.values, &vl.values_len);
549
550                         if (status != 0)
551                         {
552                                 DEBUG ("parse_part_values failed.");
553                                 break;
554                         }
555
556                         if ((vl.time > 0)
557                                         && (strlen (vl.host) > 0)
558                                         && (strlen (vl.plugin) > 0)
559                                         && (strlen (type) > 0)
560                                         && (cache_check (type, &vl) == 0))
561                         {
562                                 DEBUG ("network plugin: parse_packet:"
563                                                 " dispatching values");
564                                 plugin_dispatch_values (type, &vl);
565                         }
566                         else
567                         {
568                                 DEBUG ("network plugin: parse_packet:"
569                                                 " NOT dispatching values");
570                         }
571                 }
572                 else if (ntohs (header->type) == TYPE_TIME)
573                 {
574                         uint64_t tmp = 0;
575                         status = parse_part_number (&buffer, &buffer_len, &tmp);
576                         if (status == 0)
577                         {
578                                 vl.time = (time_t) tmp;
579                                 n.time = (time_t) tmp;
580                         }
581                 }
582                 else if (ntohs (header->type) == TYPE_INTERVAL)
583                 {
584                         uint64_t tmp = 0;
585                         status = parse_part_number (&buffer, &buffer_len, &tmp);
586                         if (status == 0)
587                                 vl.interval = (int) tmp;
588                 }
589                 else if (ntohs (header->type) == TYPE_HOST)
590                 {
591                         status = parse_part_string (&buffer, &buffer_len,
592                                         vl.host, sizeof (vl.host));
593                         strncpy (n.host, vl.host, sizeof (n.host));
594                         n.host[sizeof (n.host) - 1] = '\0';
595                         DEBUG ("network plugin: parse_packet: vl.host = %s",
596                                         vl.host);
597                 }
598                 else if (ntohs (header->type) == TYPE_PLUGIN)
599                 {
600                         status = parse_part_string (&buffer, &buffer_len,
601                                         vl.plugin, sizeof (vl.plugin));
602                         strncpy (n.plugin, vl.plugin, sizeof (n.plugin));
603                         n.plugin[sizeof (n.plugin) - 1] = '\0';
604                         DEBUG ("network plugin: parse_packet: vl.plugin = %s",
605                                         vl.plugin);
606                 }
607                 else if (ntohs (header->type) == TYPE_PLUGIN_INSTANCE)
608                 {
609                         status = parse_part_string (&buffer, &buffer_len,
610                                         vl.plugin_instance,
611                                         sizeof (vl.plugin_instance));
612                         strncpy (n.plugin_instance, vl.plugin_instance,
613                                         sizeof (n.plugin_instance));
614                         n.plugin_instance[sizeof (n.plugin_instance) - 1] = '\0';
615                         DEBUG ("network plugin: parse_packet: "
616                                         "vl.plugin_instance = %s",
617                                         vl.plugin_instance);
618                 }
619                 else if (ntohs (header->type) == TYPE_TYPE)
620                 {
621                         status = parse_part_string (&buffer, &buffer_len,
622                                         type, sizeof (type));
623                         strncpy (n.type, type, sizeof (n.type));
624                         n.type[sizeof (n.type) - 1] = '\0';
625                         DEBUG ("network plugin: parse_packet: type = %s",
626                                         type);
627                 }
628                 else if (ntohs (header->type) == TYPE_TYPE_INSTANCE)
629                 {
630                         status = parse_part_string (&buffer, &buffer_len,
631                                         vl.type_instance,
632                                         sizeof (vl.type_instance));
633                         strncpy (n.type_instance, vl.type_instance,
634                                         sizeof (n.type_instance));
635                         n.type_instance[sizeof (n.type_instance) - 1] = '\0';
636                         DEBUG ("network plugin: parse_packet: "
637                                         "vl.type_instance = %s",
638                                         vl.type_instance);
639                 }
640                 else if (ntohs (header->type) == TYPE_MESSAGE)
641                 {
642                         status = parse_part_string (&buffer, &buffer_len,
643                                         n.message, sizeof (n.message));
644                         DEBUG ("network plugin: parse_packet: n.message = %s",
645                                         n.message);
646
647                         if ((n.severity != NOTIF_FAILURE)
648                                         && (n.severity != NOTIF_WARNING)
649                                         && (n.severity != NOTIF_OKAY))
650                         {
651                                 INFO ("network plugin: "
652                                                 "Ignoring notification with "
653                                                 "unknown severity %s.",
654                                                 n.severity);
655                         }
656                         else if (n.time <= 0)
657                         {
658                                 INFO ("network plugin: "
659                                                 "Ignoring notification with "
660                                                 "time == 0.");
661                         }
662                         else if (strlen (n.message) <= 0)
663                         {
664                                 INFO ("network plugin: "
665                                                 "Ignoring notification with "
666                                                 "an empty message.");
667                         }
668                         else
669                         {
670                                 /*
671                                  * TODO: Let this do a separate thread so that
672                                  * no packets are lost if this takes too long.
673                                  */
674                                 plugin_dispatch_notification (&n);
675                         }
676                 }
677                 else if (ntohs (header->type) == TYPE_SEVERITY)
678                 {
679                         uint64_t tmp = 0;
680                         status = parse_part_number (&buffer, &buffer_len, &tmp);
681                         if (status == 0)
682                                 n.severity = (int) tmp;
683                 }
684                 else
685                 {
686                         DEBUG ("network plugin: parse_packet: Unknown part"
687                                         " type: 0x%0hx", ntohs (header->type));
688                         buffer = ((char *) buffer) + ntohs (header->length);
689                 }
690         } /* while (buffer_len > sizeof (part_header_t)) */
691
692         return (0);
693 } /* int parse_packet */
694
695 static void free_sockent (sockent_t *se)
696 {
697         sockent_t *next;
698         while (se != NULL)
699         {
700                 next = se->next;
701                 free (se->addr);
702                 free (se);
703                 se = next;
704         }
705 } /* void free_sockent */
706
707 /*
708  * int network_set_ttl
709  *
710  * Set the `IP_MULTICAST_TTL', `IP_TTL', `IPV6_MULTICAST_HOPS' or
711  * `IPV6_UNICAST_HOPS', depending on which option is applicable.
712  *
713  * The `struct addrinfo' is used to destinguish between unicast and multicast
714  * sockets.
715  */
716 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
717 {
718         if ((network_config_ttl < 1) || (network_config_ttl > 255))
719                 return (-1);
720
721         DEBUG ("ttl = %i", network_config_ttl);
722
723         if (ai->ai_family == AF_INET)
724         {
725                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
726                 int optname;
727
728                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
729                         optname = IP_MULTICAST_TTL;
730                 else
731                         optname = IP_TTL;
732
733                 if (setsockopt (se->fd, IPPROTO_IP, optname,
734                                         &network_config_ttl,
735                                         sizeof (network_config_ttl)) == -1)
736                 {
737                         char errbuf[1024];
738                         ERROR ("setsockopt: %s",
739                                         sstrerror (errno, errbuf, sizeof (errbuf)));
740                         return (-1);
741                 }
742         }
743         else if (ai->ai_family == AF_INET6)
744         {
745                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
746                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
747                 int optname;
748
749                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
750                         optname = IPV6_MULTICAST_HOPS;
751                 else
752                         optname = IPV6_UNICAST_HOPS;
753
754                 if (setsockopt (se->fd, IPPROTO_IPV6, optname,
755                                         &network_config_ttl,
756                                         sizeof (network_config_ttl)) == -1)
757                 {
758                         char errbuf[1024];
759                         ERROR ("setsockopt: %s",
760                                         sstrerror (errno, errbuf,
761                                                 sizeof (errbuf)));
762                         return (-1);
763                 }
764         }
765
766         return (0);
767 } /* int network_set_ttl */
768
769 static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
770 {
771         int loop = 0;
772         int yes  = 1;
773
774         /* allow multiple sockets to use the same PORT number */
775         if (setsockopt(se->fd, SOL_SOCKET, SO_REUSEADDR,
776                                 &yes, sizeof(yes)) == -1) {
777                 char errbuf[1024];
778                 ERROR ("setsockopt: %s", 
779                                 sstrerror (errno, errbuf, sizeof (errbuf)));
780                 return (-1);
781         }
782
783         DEBUG ("fd = %i; calling `bind'", se->fd);
784
785         if (bind (se->fd, ai->ai_addr, ai->ai_addrlen) == -1)
786         {
787                 char errbuf[1024];
788                 ERROR ("bind: %s",
789                                 sstrerror (errno, errbuf, sizeof (errbuf)));
790                 return (-1);
791         }
792
793         if (ai->ai_family == AF_INET)
794         {
795                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
796                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
797                 {
798                         struct ip_mreq mreq;
799
800                         DEBUG ("fd = %i; IPv4 multicast address found", se->fd);
801
802                         mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
803                         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
804
805                         if (setsockopt (se->fd, IPPROTO_IP, IP_MULTICAST_LOOP,
806                                                 &loop, sizeof (loop)) == -1)
807                         {
808                                 char errbuf[1024];
809                                 ERROR ("setsockopt: %s",
810                                                 sstrerror (errno, errbuf,
811                                                         sizeof (errbuf)));
812                                 return (-1);
813                         }
814
815                         if (setsockopt (se->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
816                                                 &mreq, sizeof (mreq)) == -1)
817                         {
818                                 char errbuf[1024];
819                                 ERROR ("setsockopt: %s",
820                                                 sstrerror (errno, errbuf,
821                                                         sizeof (errbuf)));
822                                 return (-1);
823                         }
824                 }
825         }
826         else if (ai->ai_family == AF_INET6)
827         {
828                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
829                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
830                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
831                 {
832                         struct ipv6_mreq mreq;
833
834                         DEBUG ("fd = %i; IPv6 multicast address found", se->fd);
835
836                         memcpy (&mreq.ipv6mr_multiaddr,
837                                         &addr->sin6_addr,
838                                         sizeof (addr->sin6_addr));
839
840                         /* http://developer.apple.com/documentation/Darwin/Reference/ManPages/man4/ip6.4.html
841                          * ipv6mr_interface may be set to zeroes to
842                          * choose the default multicast interface or to
843                          * the index of a particular multicast-capable
844                          * interface if the host is multihomed.
845                          * Membership is associ-associated with a
846                          * single interface; programs running on
847                          * multihomed hosts may need to join the same
848                          * group on more than one interface.*/
849                         mreq.ipv6mr_interface = 0;
850
851                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
852                                                 &loop, sizeof (loop)) == -1)
853                         {
854                                 char errbuf[1024];
855                                 ERROR ("setsockopt: %s",
856                                                 sstrerror (errno, errbuf,
857                                                         sizeof (errbuf)));
858                                 return (-1);
859                         }
860
861                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
862                                                 &mreq, sizeof (mreq)) == -1)
863                         {
864                                 char errbuf[1024];
865                                 ERROR ("setsockopt: %s",
866                                                 sstrerror (errno, errbuf,
867                                                         sizeof (errbuf)));
868                                 return (-1);
869                         }
870                 }
871         }
872
873         return (0);
874 } /* int network_bind_socket */
875
876 static sockent_t *network_create_socket (const char *node,
877                 const char *service,
878                 int listen)
879 {
880         struct addrinfo  ai_hints;
881         struct addrinfo *ai_list, *ai_ptr;
882         int              ai_return;
883
884         sockent_t *se_head = NULL;
885         sockent_t *se_tail = NULL;
886
887         DEBUG ("node = %s, service = %s", node, service);
888
889         memset (&ai_hints, '\0', sizeof (ai_hints));
890         ai_hints.ai_flags    = 0;
891 #ifdef AI_PASSIVE
892         ai_hints.ai_flags |= AI_PASSIVE;
893 #endif
894 #ifdef AI_ADDRCONFIG
895         ai_hints.ai_flags |= AI_ADDRCONFIG;
896 #endif
897         ai_hints.ai_family   = AF_UNSPEC;
898         ai_hints.ai_socktype = SOCK_DGRAM;
899         ai_hints.ai_protocol = IPPROTO_UDP;
900
901         ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
902         if (ai_return != 0)
903         {
904                 char errbuf[1024];
905                 ERROR ("getaddrinfo (%s, %s): %s",
906                                 (node == NULL) ? "(null)" : node,
907                                 (service == NULL) ? "(null)" : service,
908                                 (ai_return == EAI_SYSTEM)
909                                 ? sstrerror (errno, errbuf, sizeof (errbuf))
910                                 : gai_strerror (ai_return));
911                 return (NULL);
912         }
913
914         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
915         {
916                 sockent_t *se;
917
918                 if ((se = (sockent_t *) malloc (sizeof (sockent_t))) == NULL)
919                 {
920                         char errbuf[1024];
921                         ERROR ("malloc: %s",
922                                         sstrerror (errno, errbuf,
923                                                 sizeof (errbuf)));
924                         continue;
925                 }
926
927                 if ((se->addr = (struct sockaddr_storage *) malloc (sizeof (struct sockaddr_storage))) == NULL)
928                 {
929                         char errbuf[1024];
930                         ERROR ("malloc: %s",
931                                         sstrerror (errno, errbuf,
932                                                 sizeof (errbuf)));
933                         free (se);
934                         continue;
935                 }
936
937                 assert (sizeof (struct sockaddr_storage) >= ai_ptr->ai_addrlen);
938                 memset (se->addr, '\0', sizeof (struct sockaddr_storage));
939                 memcpy (se->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
940                 se->addrlen = ai_ptr->ai_addrlen;
941
942                 se->fd   = socket (ai_ptr->ai_family,
943                                 ai_ptr->ai_socktype,
944                                 ai_ptr->ai_protocol);
945                 se->next = NULL;
946
947                 if (se->fd == -1)
948                 {
949                         char errbuf[1024];
950                         ERROR ("socket: %s",
951                                         sstrerror (errno, errbuf,
952                                                 sizeof (errbuf)));
953                         free (se->addr);
954                         free (se);
955                         continue;
956                 }
957
958                 if (listen != 0)
959                 {
960                         if (network_bind_socket (se, ai_ptr) != 0)
961                         {
962                                 close (se->fd);
963                                 free (se->addr);
964                                 free (se);
965                                 continue;
966                         }
967                 }
968                 else /* listen == 0 */
969                 {
970                         network_set_ttl (se, ai_ptr);
971                 }
972
973                 if (se_tail == NULL)
974                 {
975                         se_head = se;
976                         se_tail = se;
977                 }
978                 else
979                 {
980                         se_tail->next = se;
981                         se_tail = se;
982                 }
983
984                 /* We don't open more than one write-socket per node/service pair.. */
985                 if (listen == 0)
986                         break;
987         }
988
989         freeaddrinfo (ai_list);
990
991         return (se_head);
992 } /* sockent_t *network_create_socket */
993
994 static sockent_t *network_create_default_socket (int listen)
995 {
996         sockent_t *se_ptr  = NULL;
997         sockent_t *se_head = NULL;
998         sockent_t *se_tail = NULL;
999
1000         se_ptr = network_create_socket (NET_DEFAULT_V6_ADDR,
1001                         NET_DEFAULT_PORT, listen);
1002
1003         /* Don't send to the same machine in IPv6 and IPv4 if both are available. */
1004         if ((listen == 0) && (se_ptr != NULL))
1005                 return (se_ptr);
1006
1007         if (se_ptr != NULL)
1008         {
1009                 se_head = se_ptr;
1010                 se_tail = se_ptr;
1011                 while (se_tail->next != NULL)
1012                         se_tail = se_tail->next;
1013         }
1014
1015         se_ptr = network_create_socket (NET_DEFAULT_V4_ADDR, NET_DEFAULT_PORT, listen);
1016
1017         if (se_tail == NULL)
1018                 return (se_ptr);
1019
1020         se_tail->next = se_ptr;
1021         return (se_head);
1022 } /* sockent_t *network_create_default_socket */
1023
1024 static int network_add_listen_socket (const char *node, const char *service)
1025 {
1026         sockent_t *se;
1027         sockent_t *se_ptr;
1028         int se_num = 0;
1029
1030         if (service == NULL)
1031                 service = NET_DEFAULT_PORT;
1032
1033         if (node == NULL)
1034                 se = network_create_default_socket (1 /* listen == true */);
1035         else
1036                 se = network_create_socket (node, service, 1 /* listen == true */);
1037
1038         if (se == NULL)
1039                 return (-1);
1040
1041         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
1042                 se_num++;
1043
1044         listen_sockets = (struct pollfd *) realloc (listen_sockets,
1045                         (listen_sockets_num + se_num)
1046                         * sizeof (struct pollfd));
1047
1048         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
1049         {
1050                 listen_sockets[listen_sockets_num].fd = se_ptr->fd;
1051                 listen_sockets[listen_sockets_num].events = POLLIN | POLLPRI;
1052                 listen_sockets[listen_sockets_num].revents = 0;
1053                 listen_sockets_num++;
1054         } /* for (se) */
1055
1056         free_sockent (se);
1057         return (0);
1058 } /* int network_add_listen_socket */
1059
1060 static int network_add_sending_socket (const char *node, const char *service)
1061 {
1062         sockent_t *se;
1063         sockent_t *se_ptr;
1064
1065         if (service == NULL)
1066                 service = NET_DEFAULT_PORT;
1067
1068         if (node == NULL)
1069                 se = network_create_default_socket (0 /* listen == false */);
1070         else
1071                 se = network_create_socket (node, service, 0 /* listen == false */);
1072
1073         if (se == NULL)
1074                 return (-1);
1075
1076         if (sending_sockets == NULL)
1077         {
1078                 sending_sockets = se;
1079                 return (0);
1080         }
1081
1082         for (se_ptr = sending_sockets; se_ptr->next != NULL; se_ptr = se_ptr->next)
1083                 /* seek end */;
1084
1085         se_ptr->next = se;
1086         return (0);
1087 } /* int network_get_listen_socket */
1088
1089 static void *dispatch_thread (void *arg)
1090 {
1091   while (42)
1092   {
1093     receive_list_entry_t *ent;
1094
1095     /* Lock and wait for more data to come in */
1096     pthread_mutex_lock (&receive_list_lock);
1097     while ((listen_loop == 0)
1098         && (receive_list_head == NULL))
1099       pthread_cond_wait (&receive_list_cond, &receive_list_lock);
1100
1101     /* Remove the head entry and unlock */
1102     ent = receive_list_head;
1103     if (ent != NULL)
1104       receive_list_head = ent->next;
1105     pthread_mutex_unlock (&receive_list_lock);
1106
1107     /* Check whether we are supposed to exit. We do NOT check `listen_loop'
1108      * because we dispatch all missing packets before shutting down. */
1109     if (ent == NULL)
1110       break;
1111
1112     parse_packet (ent->data, ent->data_len);
1113
1114     sfree (ent);
1115   } /* while (42) */
1116
1117   return (NULL);
1118 } /* void *receive_thread */
1119
1120 static int network_receive (void)
1121 {
1122         char buffer[BUFF_SIZE];
1123         int  buffer_len;
1124
1125         int i;
1126         int status;
1127
1128         if (listen_sockets_num == 0)
1129                 network_add_listen_socket (NULL, NULL);
1130
1131         if (listen_sockets_num == 0)
1132         {
1133                 ERROR ("network: Failed to open a listening socket.");
1134                 return (-1);
1135         }
1136
1137         while (listen_loop == 0)
1138         {
1139                 status = poll (listen_sockets, listen_sockets_num, -1);
1140
1141                 if (status <= 0)
1142                 {
1143                         char errbuf[1024];
1144                         if (errno == EINTR)
1145                                 continue;
1146                         ERROR ("poll failed: %s",
1147                                         sstrerror (errno, errbuf, sizeof (errbuf)));
1148                         return (-1);
1149                 }
1150
1151                 for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
1152                 {
1153                         receive_list_entry_t *ent;
1154
1155                         if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
1156                                 continue;
1157                         status--;
1158
1159                         buffer_len = recv (listen_sockets[i].fd,
1160                                         buffer, sizeof (buffer),
1161                                         0 /* no flags */);
1162                         if (buffer_len < 0)
1163                         {
1164                                 char errbuf[1024];
1165                                 ERROR ("recv failed: %s",
1166                                                 sstrerror (errno, errbuf,
1167                                                         sizeof (errbuf)));
1168                                 return (-1);
1169                         }
1170
1171                         ent = malloc (sizeof (receive_list_entry_t));
1172                         if (ent == NULL)
1173                         {
1174                                 ERROR ("network plugin: malloc failed.");
1175                                 return (-1);
1176                         }
1177                         memset (ent, '\0', sizeof (receive_list_entry_t));
1178
1179                         /* Hopefully this be optimized out by the compiler. It
1180                          * might help prevent stupid bugs in the future though.
1181                          */
1182                         assert (sizeof (ent->data) == sizeof (buffer));
1183
1184                         memcpy (ent->data, buffer, buffer_len);
1185                         ent->data_len = buffer_len;
1186
1187                         pthread_mutex_lock (&receive_list_lock);
1188                         if (receive_list_head == NULL)
1189                         {
1190                                 receive_list_head = ent;
1191                                 receive_list_tail = ent;
1192                         }
1193                         else
1194                         {
1195                                 receive_list_tail->next = ent;
1196                                 receive_list_tail = ent;
1197                         }
1198                         pthread_cond_signal (&receive_list_cond);
1199                         pthread_mutex_unlock (&receive_list_lock);
1200                 } /* for (listen_sockets) */
1201         } /* while (listen_loop == 0) */
1202
1203         return (0);
1204 }
1205
1206 static void *receive_thread (void *arg)
1207 {
1208         return (network_receive () ? (void *) 1 : (void *) 0);
1209 } /* void *receive_thread */
1210
1211 static void network_send_buffer (const char *buffer, int buffer_len)
1212 {
1213         sockent_t *se;
1214         int status;
1215
1216         DEBUG ("network plugin: network_send_buffer: buffer_len = %i", buffer_len);
1217
1218         for (se = sending_sockets; se != NULL; se = se->next)
1219         {
1220                 while (42)
1221                 {
1222                         status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */,
1223                                         (struct sockaddr *) se->addr, se->addrlen);
1224                         if (status < 0)
1225                         {
1226                                 char errbuf[1024];
1227                                 if (errno == EINTR)
1228                                         continue;
1229                                 ERROR ("network plugin: sendto failed: %s",
1230                                                 sstrerror (errno, errbuf,
1231                                                         sizeof (errbuf)));
1232                                 break;
1233                         }
1234
1235                         break;
1236                 } /* while (42) */
1237         } /* for (sending_sockets) */
1238 } /* void network_send_buffer */
1239
1240 static int add_to_buffer (char *buffer, int buffer_size,
1241                 value_list_t *vl_def, char *type_def,
1242                 const data_set_t *ds, const value_list_t *vl)
1243 {
1244         char *buffer_orig = buffer;
1245
1246         if (strcmp (vl_def->host, vl->host) != 0)
1247         {
1248                 if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
1249                                         vl->host, strlen (vl->host)) != 0)
1250                         return (-1);
1251                 strcpy (vl_def->host, vl->host);
1252         }
1253
1254         if (vl_def->time != vl->time)
1255         {
1256                 if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
1257                                         (uint64_t) vl->time))
1258                         return (-1);
1259                 vl_def->time = vl->time;
1260         }
1261
1262         if (vl_def->interval != vl->interval)
1263         {
1264                 if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
1265                                         (uint64_t) vl->interval))
1266                         return (-1);
1267                 vl_def->interval = vl->interval;
1268         }
1269
1270         if (strcmp (vl_def->plugin, vl->plugin) != 0)
1271         {
1272                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
1273                                         vl->plugin, strlen (vl->plugin)) != 0)
1274                         return (-1);
1275                 strcpy (vl_def->plugin, vl->plugin);
1276         }
1277
1278         if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
1279         {
1280                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
1281                                         vl->plugin_instance,
1282                                         strlen (vl->plugin_instance)) != 0)
1283                         return (-1);
1284                 strcpy (vl_def->plugin_instance, vl->plugin_instance);
1285         }
1286
1287         if (strcmp (type_def, ds->type) != 0)
1288         {
1289                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
1290                                         ds->type, strlen (ds->type)) != 0)
1291                         return (-1);
1292                 strcpy (type_def, ds->type);
1293         }
1294
1295         if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
1296         {
1297                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
1298                                         vl->type_instance,
1299                                         strlen (vl->type_instance)) != 0)
1300                         return (-1);
1301                 strcpy (vl_def->type_instance, vl->type_instance);
1302         }
1303         
1304         if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
1305                 return (-1);
1306
1307         return (buffer - buffer_orig);
1308 } /* int add_to_buffer */
1309
1310 static void flush_buffer (void)
1311 {
1312         DEBUG ("network plugin: flush_buffer: send_buffer_fill = %i",
1313                         send_buffer_fill);
1314
1315         network_send_buffer (send_buffer, send_buffer_fill);
1316         send_buffer_ptr  = send_buffer;
1317         send_buffer_fill = 0;
1318         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1319         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1320 }
1321
1322 static int network_write (const data_set_t *ds, const value_list_t *vl)
1323 {
1324         int status;
1325
1326         /* If the value is already in the cache, we have received it via the
1327          * network. We write it again if forwarding is activated. It's then in
1328          * the cache and should we receive it again we will ignore it. */
1329         status = cache_check (ds->type, vl);
1330         if ((network_config_forward == 0)
1331                         && (status != 0))
1332                 return (0);
1333
1334         pthread_mutex_lock (&send_buffer_lock);
1335
1336         status = add_to_buffer (send_buffer_ptr,
1337                         sizeof (send_buffer) - send_buffer_fill,
1338                         &send_buffer_vl, send_buffer_type,
1339                         ds, vl);
1340         if (status >= 0)
1341         {
1342                 /* status == bytes added to the buffer */
1343                 send_buffer_fill += status;
1344                 send_buffer_ptr  += status;
1345         }
1346         else
1347         {
1348                 flush_buffer ();
1349
1350                 status = add_to_buffer (send_buffer_ptr,
1351                                 sizeof (send_buffer) - send_buffer_fill,
1352                                 &send_buffer_vl, send_buffer_type,
1353                                 ds, vl);
1354
1355                 if (status >= 0)
1356                 {
1357                         send_buffer_fill += status;
1358                         send_buffer_ptr  += status;
1359                 }
1360         }
1361
1362         if (status < 0)
1363         {
1364                 ERROR ("network plugin: Unable to append to the "
1365                                 "buffer for some weird reason");
1366         }
1367         else if ((sizeof (send_buffer) - send_buffer_fill) < 15)
1368         {
1369                 flush_buffer ();
1370         }
1371
1372         pthread_mutex_unlock (&send_buffer_lock);
1373
1374         return ((status < 0) ? -1 : 0);
1375 } /* int network_write */
1376
1377 static int network_config (const char *key, const char *val)
1378 {
1379         char *node;
1380         char *service;
1381
1382         char *fields[3];
1383         int   fields_num;
1384
1385         if ((strcasecmp ("Listen", key) == 0)
1386                         || (strcasecmp ("Server", key) == 0))
1387         {
1388                 char *val_cpy = strdup (val);
1389                 if (val_cpy == NULL)
1390                         return (1);
1391
1392                 service = NET_DEFAULT_PORT;
1393                 fields_num = strsplit (val_cpy, fields, 3);
1394                 if ((fields_num != 1)
1395                                 && (fields_num != 2))
1396                         return (1);
1397                 else if (fields_num == 2)
1398                 {
1399                         if ((service = strchr (fields[1], '.')) != NULL)
1400                                 *service = '\0';
1401                         service = fields[1];
1402                 }
1403                 node = fields[0];
1404
1405                 if (strcasecmp ("Listen", key) == 0)
1406                         network_add_listen_socket (node, service);
1407                 else
1408                         network_add_sending_socket (node, service);
1409         }
1410         else if (strcasecmp ("TimeToLive", key) == 0)
1411         {
1412                 int tmp = atoi (val);
1413                 if ((tmp > 0) && (tmp < 256))
1414                         network_config_ttl = tmp;
1415                 else
1416                         return (1);
1417         }
1418         else if (strcasecmp ("Forward", key) == 0)
1419         {
1420                 if ((strcasecmp ("true", val) == 0)
1421                                 || (strcasecmp ("yes", val) == 0)
1422                                 || (strcasecmp ("on", val) == 0))
1423                         network_config_forward = 1;
1424                 else
1425                         network_config_forward = 0;
1426         }
1427         else if (strcasecmp ("CacheFlush", key) == 0)
1428         {
1429                 int tmp = atoi (val);
1430                 if (tmp > 0)
1431                         cache_flush_interval = tmp;
1432                 else return (1);
1433         }
1434         else
1435         {
1436                 return (-1);
1437         }
1438         return (0);
1439 } /* int network_config */
1440
1441 static int network_notification (const notification_t *n)
1442 {
1443   char  buffer[BUFF_SIZE];
1444   char *buffer_ptr = buffer;
1445   int   buffer_free = sizeof (buffer);
1446   int   status;
1447
1448   memset (buffer, '\0', sizeof (buffer));
1449
1450
1451   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME,
1452       (uint64_t) n->time);
1453   if (status != 0)
1454     return (-1);
1455
1456   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY,
1457       (uint64_t) n->severity);
1458   if (status != 0)
1459     return (-1);
1460
1461   if (strlen (n->host) > 0)
1462   {
1463     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST,
1464         n->host, strlen (n->host));
1465     if (status != 0)
1466       return (-1);
1467   }
1468
1469   if (strlen (n->plugin) > 0)
1470   {
1471     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN,
1472         n->plugin, strlen (n->plugin));
1473     if (status != 0)
1474       return (-1);
1475   }
1476
1477   if (strlen (n->plugin_instance) > 0)
1478   {
1479     status = write_part_string (&buffer_ptr, &buffer_free,
1480         TYPE_PLUGIN_INSTANCE,
1481         n->plugin_instance, strlen (n->plugin_instance));
1482     if (status != 0)
1483       return (-1);
1484   }
1485
1486   if (strlen (n->type) > 0)
1487   {
1488     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE,
1489         n->type, strlen (n->type));
1490     if (status != 0)
1491       return (-1);
1492   }
1493
1494   if (strlen (n->type_instance) > 0)
1495   {
1496     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE,
1497         n->type_instance, strlen (n->type_instance));
1498     if (status != 0)
1499       return (-1);
1500   }
1501
1502   status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE,
1503       n->message, strlen (n->message));
1504   if (status != 0)
1505     return (-1);
1506
1507   network_send_buffer (buffer, sizeof (buffer) - buffer_free);
1508
1509   return (0);
1510 } /* int network_notification */
1511
1512 static int network_shutdown (void)
1513 {
1514         listen_loop++;
1515
1516         /* Kill the listening thread */
1517         if (receive_thread_id != (pthread_t) 0)
1518         {
1519                 pthread_kill (receive_thread_id, SIGTERM);
1520                 pthread_join (receive_thread_id, NULL /* no return value */);
1521                 receive_thread_id = (pthread_t) 0;
1522         }
1523
1524         /* Shutdown the dispatching thread */
1525         if (dispatch_thread_id != (pthread_t) 0)
1526                 pthread_cond_broadcast (&receive_list_cond);
1527
1528         if (send_buffer_fill > 0)
1529                 flush_buffer ();
1530
1531         if (cache_tree != NULL)
1532         {
1533                 void *key;
1534                 void *value;
1535
1536                 while (c_avl_pick (cache_tree, &key, &value) == 0)
1537                 {
1538                         sfree (key);
1539                         sfree (value);
1540                 }
1541                 c_avl_destroy (cache_tree);
1542                 cache_tree = NULL;
1543         }
1544
1545         /* TODO: Close `sending_sockets' */
1546
1547         plugin_unregister_config ("network");
1548         plugin_unregister_init ("network");
1549         plugin_unregister_write ("network");
1550         plugin_unregister_shutdown ("network");
1551
1552         return (0);
1553 } /* int network_shutdown */
1554
1555 static int network_init (void)
1556 {
1557         plugin_register_shutdown ("network", network_shutdown);
1558
1559         send_buffer_ptr  = send_buffer;
1560         send_buffer_fill = 0;
1561         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1562         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1563
1564         cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1565         cache_flush_last = time (NULL);
1566
1567         /* setup socket(s) and so on */
1568         if (sending_sockets != NULL)
1569         {
1570                 plugin_register_write ("network", network_write);
1571                 plugin_register_notification ("network", network_notification);
1572         }
1573
1574         if ((listen_sockets_num != 0) && (receive_thread_id == 0))
1575         {
1576                 int status;
1577
1578                 status = pthread_create (&dispatch_thread_id,
1579                                 NULL /* no attributes */,
1580                                 dispatch_thread,
1581                                 NULL /* no argument */);
1582                 if (status != 0)
1583                 {
1584                         char errbuf[1024];
1585                         ERROR ("network: pthread_create failed: %s",
1586                                         sstrerror (errno, errbuf,
1587                                                 sizeof (errbuf)));
1588                 }
1589
1590                 status = pthread_create (&receive_thread_id,
1591                                 NULL /* no attributes */,
1592                                 receive_thread,
1593                                 NULL /* no argument */);
1594                 if (status != 0)
1595                 {
1596                         char errbuf[1024];
1597                         ERROR ("network: pthread_create failed: %s",
1598                                         sstrerror (errno, errbuf,
1599                                                 sizeof (errbuf)));
1600                 }
1601         }
1602         return (0);
1603 } /* int network_init */
1604
1605 static int network_flush (int timeout)
1606 {
1607         pthread_mutex_lock (&send_buffer_lock);
1608
1609         if (((time (NULL) - cache_flush_last) >= timeout)
1610                         && (send_buffer_fill > 0))
1611         {
1612                 flush_buffer ();
1613         }
1614
1615         pthread_mutex_unlock (&send_buffer_lock);
1616
1617         return (0);
1618 } /* int network_flush */
1619
1620 void module_register (void)
1621 {
1622         plugin_register_config ("network", network_config,
1623                         config_keys, config_keys_num);
1624         plugin_register_init   ("network", network_init);
1625         plugin_register_flush   ("network", network_flush);
1626 } /* void module_register */