Merge branch 'collectd-4.3'
[collectd.git] / src / network.c
1 /**
2  * collectd - src/network.c
3  * Copyright (C) 2005-2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27
28 #include "network.h"
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33 #if HAVE_SYS_SOCKET_H
34 # include <sys/socket.h>
35 #endif
36 #if HAVE_NETDB_H
37 # include <netdb.h>
38 #endif
39 #if HAVE_NETINET_IN_H
40 # include <netinet/in.h>
41 #endif
42 #if HAVE_ARPA_INET_H
43 # include <arpa/inet.h>
44 #endif
45 #if HAVE_POLL_H
46 # include <poll.h>
47 #endif
48
49 /* 1500 - 40 - 8  =  Ethernet packet - IPv6 header - UDP header */
50 /* #define BUFF_SIZE 1452 */
51
52 #ifndef IPV6_ADD_MEMBERSHIP
53 # ifdef IPV6_JOIN_GROUP
54 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
55 # else
56 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
57 # endif
58 #endif /* !IP_ADD_MEMBERSHIP */
59
60 #define BUFF_SIZE 1024
61
62 /*
63  * Private data types
64  */
65 typedef struct sockent
66 {
67         int                      fd;
68         struct sockaddr_storage *addr;
69         socklen_t                addrlen;
70         struct sockent          *next;
71 } sockent_t;
72
73 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
74  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
75  * +-------+-----------------------+-------------------------------+
76  * ! Ver.  !                       ! Length                        !
77  * +-------+-----------------------+-------------------------------+
78  */
79 struct part_header_s
80 {
81         uint16_t type;
82         uint16_t length;
83 };
84 typedef struct part_header_s part_header_t;
85
86 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
87  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
88  * +-------------------------------+-------------------------------+
89  * ! Type                          ! Length                        !
90  * +-------------------------------+-------------------------------+
91  * : (Length - 4) Bytes                                            :
92  * +---------------------------------------------------------------+
93  */
94 struct part_string_s
95 {
96         part_header_t *head;
97         char *value;
98 };
99 typedef struct part_string_s part_string_t;
100
101 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
102  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
103  * +-------------------------------+-------------------------------+
104  * ! Type                          ! Length                        !
105  * +-------------------------------+-------------------------------+
106  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
107  * +---------------------------------------------------------------+
108  */
109 struct part_number_s
110 {
111         part_header_t *head;
112         uint64_t *value;
113 };
114 typedef struct part_number_s part_number_t;
115
116 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
117  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
118  * +-------------------------------+-------------------------------+
119  * ! Type                          ! Length                        !
120  * +-------------------------------+---------------+---------------+
121  * ! Num of values                 ! Type0         ! Type1         !
122  * +-------------------------------+---------------+---------------+
123  * ! Value0                                                        !
124  * !                                                               !
125  * +---------------------------------------------------------------+
126  * ! Value1                                                        !
127  * !                                                               !
128  * +---------------------------------------------------------------+
129  */
130 struct part_values_s
131 {
132         part_header_t *head;
133         uint16_t *num_values;
134         uint8_t  *values_types;
135         value_t  *values;
136 };
137 typedef struct part_values_s part_values_t;
138
139 struct receive_list_entry_s
140 {
141   char data[BUFF_SIZE];
142   int  data_len;
143   struct receive_list_entry_s *next;
144 };
145 typedef struct receive_list_entry_s receive_list_entry_t;
146
147 /*
148  * Private variables
149  */
150 static const char *config_keys[] =
151 {
152         "CacheFlush",
153         "Listen",
154         "Server",
155         "TimeToLive",
156         "Forward"
157 };
158 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
159
160 static int network_config_ttl = 0;
161 static int network_config_forward = 0;
162
163 static sockent_t *sending_sockets = NULL;
164
165 static receive_list_entry_t *receive_list_head = NULL;
166 static receive_list_entry_t *receive_list_tail = NULL;
167 static pthread_mutex_t       receive_list_lock = PTHREAD_MUTEX_INITIALIZER;
168 static pthread_cond_t        receive_list_cond = PTHREAD_COND_INITIALIZER;
169
170 static struct pollfd *listen_sockets = NULL;
171 static int listen_sockets_num = 0;
172
173 static int listen_loop = 0;
174 static pthread_t receive_thread_id = 0;
175 static pthread_t dispatch_thread_id = 0;
176
177 static char         send_buffer[BUFF_SIZE];
178 static char        *send_buffer_ptr;
179 static int          send_buffer_fill;
180 static value_list_t send_buffer_vl = VALUE_LIST_STATIC;
181 static char         send_buffer_type[DATA_MAX_NAME_LEN];
182 static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
183
184 static c_avl_tree_t      *cache_tree = NULL;
185 static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
186 static time_t           cache_flush_last;
187 static int              cache_flush_interval = 1800;
188
189 /*
190  * Private functions
191  */
192 static int cache_flush (void)
193 {
194         char **keys = NULL;
195         int    keys_num = 0;
196
197         char **tmp;
198         int    i;
199
200         char   *key;
201         time_t *value;
202         c_avl_iterator_t *iter;
203
204         time_t curtime = time (NULL);
205
206         iter = c_avl_get_iterator (cache_tree);
207         while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
208         {
209                 if ((curtime - *value) <= cache_flush_interval)
210                         continue;
211                 tmp = (char **) realloc (keys,
212                                 (keys_num + 1) * sizeof (char *));
213                 if (tmp == NULL)
214                 {
215                         sfree (keys);
216                         c_avl_iterator_destroy (iter);
217                         ERROR ("network plugin: cache_flush: realloc"
218                                         " failed.");
219                         return (-1);
220                 }
221                 keys = tmp;
222                 keys[keys_num] = key;
223                 keys_num++;
224         } /* while (c_avl_iterator_next) */
225         c_avl_iterator_destroy (iter);
226
227         for (i = 0; i < keys_num; i++)
228         {
229                 if (c_avl_remove (cache_tree, keys[i], (void *) &key,
230                                         (void *) &value) != 0)
231                 {
232                         WARNING ("network plugin: cache_flush: c_avl_remove"
233                                         " (%s) failed.", keys[i]);
234                         continue;
235                 }
236
237                 sfree (key);
238                 sfree (value);
239         }
240
241         sfree (keys);
242
243         DEBUG ("network plugin: cache_flush: Removed %i %s",
244                         keys_num, (keys_num == 1) ? "entry" : "entries");
245         cache_flush_last = curtime;
246         return (0);
247 } /* int cache_flush */
248
249 static int cache_check (const char *type, const value_list_t *vl)
250 {
251         char key[1024];
252         time_t *value = NULL;
253         int retval = -1;
254
255         if (cache_tree == NULL)
256                 return (-1);
257
258         if (format_name (key, sizeof (key), vl->host, vl->plugin,
259                                 vl->plugin_instance, type, vl->type_instance))
260                 return (-1);
261
262         pthread_mutex_lock (&cache_lock);
263
264         if (c_avl_get (cache_tree, key, (void *) &value) == 0)
265         {
266                 if (*value < vl->time)
267                 {
268                         *value = vl->time;
269                         retval = 0;
270                 }
271                 else
272                 {
273                         DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i",
274                                         (int) *value, (int) vl->time);
275                         retval = 1;
276                 }
277         }
278         else
279         {
280                 char *key_copy = strdup (key);
281                 value = malloc (sizeof (time_t));
282                 if ((key_copy != NULL) && (value != NULL))
283                 {
284                         *value = vl->time;
285                         c_avl_insert (cache_tree, key_copy, value);
286                         retval = 0;
287                 }
288                 else
289                 {
290                         sfree (key_copy);
291                         sfree (value);
292                 }
293         }
294
295         if ((time (NULL) - cache_flush_last) > cache_flush_interval)
296                 cache_flush ();
297
298         pthread_mutex_unlock (&cache_lock);
299
300         DEBUG ("network plugin: cache_check: key = %s; time = %i; retval = %i",
301                         key, (int) vl->time, retval);
302
303         return (retval);
304 } /* int cache_check */
305
306 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
307                 const data_set_t *ds, const value_list_t *vl)
308 {
309         char *packet_ptr;
310         int packet_len;
311         int num_values;
312
313         part_header_t pkg_ph;
314         uint16_t      pkg_num_values;
315         uint8_t      *pkg_values_types;
316         value_t      *pkg_values;
317
318         int offset;
319         int i;
320
321         num_values = vl->values_len;
322         packet_len = sizeof (part_header_t) + sizeof (uint16_t)
323                 + (num_values * sizeof (uint8_t))
324                 + (num_values * sizeof (value_t));
325
326         if (*ret_buffer_len < packet_len)
327                 return (-1);
328
329         pkg_values_types = (uint8_t *) malloc (num_values * sizeof (uint8_t));
330         if (pkg_values_types == NULL)
331         {
332                 ERROR ("network plugin: write_part_values: malloc failed.");
333                 return (-1);
334         }
335
336         pkg_values = (value_t *) malloc (num_values * sizeof (value_t));
337         if (pkg_values == NULL)
338         {
339                 free (pkg_values_types);
340                 ERROR ("network plugin: write_part_values: malloc failed.");
341                 return (-1);
342         }
343
344         pkg_ph.type = htons (TYPE_VALUES);
345         pkg_ph.length = htons (packet_len);
346
347         pkg_num_values = htons ((uint16_t) vl->values_len);
348
349         for (i = 0; i < num_values; i++)
350         {
351                 if (ds->ds[i].type == DS_TYPE_COUNTER)
352                 {
353                         pkg_values_types[i] = DS_TYPE_COUNTER;
354                         pkg_values[i].counter = htonll (vl->values[i].counter);
355                 }
356                 else
357                 {
358                         pkg_values_types[i] = DS_TYPE_GAUGE;
359                         pkg_values[i].gauge = vl->values[i].gauge;
360                 }
361         }
362
363         /*
364          * Use `memcpy' to write everything to the buffer, because the pointer
365          * may be unaligned and some architectures, such as SPARC, can't handle
366          * that.
367          */
368         packet_ptr = *ret_buffer;
369         offset = 0;
370         memcpy (packet_ptr + offset, &pkg_ph, sizeof (pkg_ph));
371         offset += sizeof (pkg_ph);
372         memcpy (packet_ptr + offset, &pkg_num_values, sizeof (pkg_num_values));
373         offset += sizeof (pkg_num_values);
374         memcpy (packet_ptr + offset, pkg_values_types, num_values * sizeof (uint8_t));
375         offset += num_values * sizeof (uint8_t);
376         memcpy (packet_ptr + offset, pkg_values, num_values * sizeof (value_t));
377         offset += num_values * sizeof (value_t);
378
379         assert (offset == packet_len);
380
381         *ret_buffer = packet_ptr + packet_len;
382         *ret_buffer_len -= packet_len;
383
384         free (pkg_values_types);
385         free (pkg_values);
386
387         return (0);
388 } /* int write_part_values */
389
390 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
391                 int type, uint64_t value)
392 {
393         char *packet_ptr;
394         int packet_len;
395
396         part_header_t pkg_head;
397         uint64_t pkg_value;
398         
399         int offset;
400
401         packet_len = sizeof (pkg_head) + sizeof (pkg_value);
402
403         if (*ret_buffer_len < packet_len)
404                 return (-1);
405
406         pkg_head.type = htons (type);
407         pkg_head.length = htons (packet_len);
408         pkg_value = htonll (value);
409
410         packet_ptr = *ret_buffer;
411         offset = 0;
412         memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
413         offset += sizeof (pkg_head);
414         memcpy (packet_ptr + offset, &pkg_value, sizeof (pkg_value));
415         offset += sizeof (pkg_value);
416
417         assert (offset == packet_len);
418
419         *ret_buffer = packet_ptr + packet_len;
420         *ret_buffer_len -= packet_len;
421
422         return (0);
423 } /* int write_part_number */
424
425 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
426                 int type, const char *str, int str_len)
427 {
428         char *packet_ptr;
429         int packet_len;
430
431         part_header_t pkg_head;
432
433         int offset;
434
435         packet_len = sizeof (pkg_head) + str_len + 1;
436         if (*ret_buffer_len < packet_len)
437                 return (-1);
438
439         pkg_head.type = htons (type);
440         pkg_head.length = htons (packet_len);
441
442         packet_ptr = *ret_buffer;
443         offset = 0;
444         memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
445         offset += sizeof (pkg_head);
446         memcpy (packet_ptr + offset, str, str_len);
447         offset += str_len;
448         memset (packet_ptr + offset, '\0', 1);
449         offset += 1;
450
451         assert (offset == packet_len);
452
453         *ret_buffer = packet_ptr + packet_len;
454         *ret_buffer_len -= packet_len;
455
456         return (0);
457 } /* int write_part_string */
458
459 static int parse_part_values (void **ret_buffer, int *ret_buffer_len,
460                 value_t **ret_values, int *ret_num_values)
461 {
462         char *buffer = *ret_buffer;
463         int   buffer_len = *ret_buffer_len;
464         part_values_t pv;
465         int   i;
466
467         uint16_t h_length;
468         uint16_t h_type;
469         uint16_t h_num;
470
471         if (buffer_len < (15))
472         {
473                 DEBUG ("network plugin: packet is too short: buffer_len = %i",
474                                 buffer_len);
475                 return (-1);
476         }
477
478         pv.head = (part_header_t *) buffer;
479         h_length = ntohs (pv.head->length);
480         h_type = ntohs (pv.head->type);
481
482         assert (h_type == TYPE_VALUES);
483
484         pv.num_values = (uint16_t *) (pv.head + 1);
485         h_num = ntohs (*pv.num_values);
486
487         if (h_num != ((h_length - 6) / 9))
488         {
489                 DEBUG ("`length' and `num of values' don't match");
490                 return (-1);
491         }
492
493         pv.values_types = (uint8_t *) (pv.num_values + 1);
494         pv.values = (value_t *) (pv.values_types + h_num);
495
496         for (i = 0; i < h_num; i++)
497                 if (pv.values_types[i] == DS_TYPE_COUNTER)
498                         pv.values[i].counter = ntohll (pv.values[i].counter);
499
500         *ret_buffer     = (void *) (pv.values + h_num);
501         *ret_buffer_len = buffer_len - h_length;
502         *ret_num_values = h_num;
503         *ret_values     = pv.values;
504
505         return (0);
506 } /* int parse_part_values */
507
508 static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
509                 uint64_t *value)
510 {
511         part_number_t pn;
512         uint16_t len;
513
514         pn.head = (part_header_t *) *ret_buffer;
515         pn.value = (uint64_t *) (pn.head + 1);
516
517         len = ntohs (pn.head->length);
518         if (len != 12)
519                 return (-1);
520         if (len > *ret_buffer_len)
521                 return (-1);
522         *value = ntohll (*pn.value);
523
524         *ret_buffer = (void *) (pn.value + 1);
525         *ret_buffer_len -= len;
526
527         return (0);
528 } /* int parse_part_number */
529
530 static int parse_part_string (void **ret_buffer, int *ret_buffer_len,
531                 char *output, int output_len)
532 {
533         char *buffer = *ret_buffer;
534         int   buffer_len = *ret_buffer_len;
535         part_string_t ps;
536
537         uint16_t h_length;
538         uint16_t h_type;
539
540         DEBUG ("network plugin: parse_part_string: ret_buffer = %p;"
541                         " ret_buffer_len = %i; output = %p; output_len = %i;",
542                         *ret_buffer, *ret_buffer_len,
543                         (void *) output, output_len);
544
545         ps.head = (part_header_t *) buffer;
546
547         h_length = ntohs (ps.head->length);
548         h_type = ntohs (ps.head->type);
549
550         DEBUG ("network plugin: parse_part_string: length = %hu; type = %hu;",
551                         h_length, h_type);
552
553         if (buffer_len < h_length)
554         {
555                 DEBUG ("packet is too short");
556                 return (-1);
557         }
558         assert ((h_type == TYPE_HOST)
559                         || (h_type == TYPE_PLUGIN)
560                         || (h_type == TYPE_PLUGIN_INSTANCE)
561                         || (h_type == TYPE_TYPE)
562                         || (h_type == TYPE_TYPE_INSTANCE)
563                         || (h_type == TYPE_MESSAGE));
564
565         ps.value = buffer + 4;
566         if (ps.value[h_length - 5] != '\0')
567         {
568                 DEBUG ("String does not end with a nullbyte");
569                 return (-1);
570         }
571
572         if (output_len < (h_length - 4))
573         {
574                 DEBUG ("output buffer is too small");
575                 return (-1);
576         }
577         strcpy (output, ps.value);
578
579         DEBUG ("network plugin: parse_part_string: output = %s", output);
580
581         *ret_buffer = (void *) (buffer + h_length);
582         *ret_buffer_len = buffer_len - h_length;
583
584         return (0);
585 } /* int parse_part_string */
586
587 static int parse_packet (void *buffer, int buffer_len)
588 {
589         part_header_t *header;
590         int status;
591
592         value_list_t vl = VALUE_LIST_INIT;
593         char type[DATA_MAX_NAME_LEN];
594         notification_t n;
595
596         DEBUG ("network plugin: parse_packet: buffer = %p; buffer_len = %i;",
597                         buffer, buffer_len);
598
599         memset (&vl, '\0', sizeof (vl));
600         memset (&type, '\0', sizeof (type));
601         memset (&n, '\0', sizeof (n));
602         status = 0;
603
604         while ((status == 0) && (0 < buffer_len)
605                         && ((unsigned int) buffer_len > sizeof (part_header_t)))
606         {
607                 header = (part_header_t *) buffer;
608
609                 if (ntohs (header->length) > buffer_len)
610                         break;
611                 /* Assure that this loop terminates eventually */
612                 if (ntohs (header->length) < 4)
613                         break;
614
615                 if (ntohs (header->type) == TYPE_VALUES)
616                 {
617                         status = parse_part_values (&buffer, &buffer_len,
618                                         &vl.values, &vl.values_len);
619
620                         if (status != 0)
621                         {
622                                 DEBUG ("parse_part_values failed.");
623                                 break;
624                         }
625
626                         if ((vl.time > 0)
627                                         && (strlen (vl.host) > 0)
628                                         && (strlen (vl.plugin) > 0)
629                                         && (strlen (type) > 0)
630                                         && (cache_check (type, &vl) == 0))
631                         {
632                                 DEBUG ("network plugin: parse_packet:"
633                                                 " dispatching values");
634                                 plugin_dispatch_values (type, &vl);
635                         }
636                         else
637                         {
638                                 DEBUG ("network plugin: parse_packet:"
639                                                 " NOT dispatching values");
640                         }
641                 }
642                 else if (ntohs (header->type) == TYPE_TIME)
643                 {
644                         uint64_t tmp = 0;
645                         status = parse_part_number (&buffer, &buffer_len, &tmp);
646                         if (status == 0)
647                         {
648                                 vl.time = (time_t) tmp;
649                                 n.time = (time_t) tmp;
650                         }
651                 }
652                 else if (ntohs (header->type) == TYPE_INTERVAL)
653                 {
654                         uint64_t tmp = 0;
655                         status = parse_part_number (&buffer, &buffer_len, &tmp);
656                         if (status == 0)
657                                 vl.interval = (int) tmp;
658                 }
659                 else if (ntohs (header->type) == TYPE_HOST)
660                 {
661                         status = parse_part_string (&buffer, &buffer_len,
662                                         vl.host, sizeof (vl.host));
663                         strncpy (n.host, vl.host, sizeof (n.host));
664                         n.host[sizeof (n.host) - 1] = '\0';
665                         DEBUG ("network plugin: parse_packet: vl.host = %s",
666                                         vl.host);
667                 }
668                 else if (ntohs (header->type) == TYPE_PLUGIN)
669                 {
670                         status = parse_part_string (&buffer, &buffer_len,
671                                         vl.plugin, sizeof (vl.plugin));
672                         strncpy (n.plugin, vl.plugin, sizeof (n.plugin));
673                         n.plugin[sizeof (n.plugin) - 1] = '\0';
674                         DEBUG ("network plugin: parse_packet: vl.plugin = %s",
675                                         vl.plugin);
676                 }
677                 else if (ntohs (header->type) == TYPE_PLUGIN_INSTANCE)
678                 {
679                         status = parse_part_string (&buffer, &buffer_len,
680                                         vl.plugin_instance,
681                                         sizeof (vl.plugin_instance));
682                         strncpy (n.plugin_instance, vl.plugin_instance,
683                                         sizeof (n.plugin_instance));
684                         n.plugin_instance[sizeof (n.plugin_instance) - 1] = '\0';
685                         DEBUG ("network plugin: parse_packet: "
686                                         "vl.plugin_instance = %s",
687                                         vl.plugin_instance);
688                 }
689                 else if (ntohs (header->type) == TYPE_TYPE)
690                 {
691                         status = parse_part_string (&buffer, &buffer_len,
692                                         type, sizeof (type));
693                         strncpy (n.type, type, sizeof (n.type));
694                         n.type[sizeof (n.type) - 1] = '\0';
695                         DEBUG ("network plugin: parse_packet: type = %s",
696                                         type);
697                 }
698                 else if (ntohs (header->type) == TYPE_TYPE_INSTANCE)
699                 {
700                         status = parse_part_string (&buffer, &buffer_len,
701                                         vl.type_instance,
702                                         sizeof (vl.type_instance));
703                         strncpy (n.type_instance, vl.type_instance,
704                                         sizeof (n.type_instance));
705                         n.type_instance[sizeof (n.type_instance) - 1] = '\0';
706                         DEBUG ("network plugin: parse_packet: "
707                                         "vl.type_instance = %s",
708                                         vl.type_instance);
709                 }
710                 else if (ntohs (header->type) == TYPE_MESSAGE)
711                 {
712                         status = parse_part_string (&buffer, &buffer_len,
713                                         n.message, sizeof (n.message));
714                         DEBUG ("network plugin: parse_packet: n.message = %s",
715                                         n.message);
716
717                         if ((n.severity != NOTIF_FAILURE)
718                                         && (n.severity != NOTIF_WARNING)
719                                         && (n.severity != NOTIF_OKAY))
720                         {
721                                 INFO ("network plugin: "
722                                                 "Ignoring notification with "
723                                                 "unknown severity %s.",
724                                                 n.severity);
725                         }
726                         else if (n.time <= 0)
727                         {
728                                 INFO ("network plugin: "
729                                                 "Ignoring notification with "
730                                                 "time == 0.");
731                         }
732                         else if (strlen (n.message) <= 0)
733                         {
734                                 INFO ("network plugin: "
735                                                 "Ignoring notification with "
736                                                 "an empty message.");
737                         }
738                         else
739                         {
740                                 /*
741                                  * TODO: Let this do a separate thread so that
742                                  * no packets are lost if this takes too long.
743                                  */
744                                 plugin_dispatch_notification (&n);
745                         }
746                 }
747                 else if (ntohs (header->type) == TYPE_SEVERITY)
748                 {
749                         uint64_t tmp = 0;
750                         status = parse_part_number (&buffer, &buffer_len, &tmp);
751                         if (status == 0)
752                                 n.severity = (int) tmp;
753                 }
754                 else
755                 {
756                         DEBUG ("network plugin: parse_packet: Unknown part"
757                                         " type: 0x%0hx", ntohs (header->type));
758                         buffer = ((char *) buffer) + ntohs (header->length);
759                 }
760         } /* while (buffer_len > sizeof (part_header_t)) */
761
762         return (0);
763 } /* int parse_packet */
764
765 static void free_sockent (sockent_t *se)
766 {
767         sockent_t *next;
768         while (se != NULL)
769         {
770                 next = se->next;
771                 free (se->addr);
772                 free (se);
773                 se = next;
774         }
775 } /* void free_sockent */
776
777 /*
778  * int network_set_ttl
779  *
780  * Set the `IP_MULTICAST_TTL', `IP_TTL', `IPV6_MULTICAST_HOPS' or
781  * `IPV6_UNICAST_HOPS', depending on which option is applicable.
782  *
783  * The `struct addrinfo' is used to destinguish between unicast and multicast
784  * sockets.
785  */
786 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
787 {
788         if ((network_config_ttl < 1) || (network_config_ttl > 255))
789                 return (-1);
790
791         DEBUG ("ttl = %i", network_config_ttl);
792
793         if (ai->ai_family == AF_INET)
794         {
795                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
796                 int optname;
797
798                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
799                         optname = IP_MULTICAST_TTL;
800                 else
801                         optname = IP_TTL;
802
803                 if (setsockopt (se->fd, IPPROTO_IP, optname,
804                                         &network_config_ttl,
805                                         sizeof (network_config_ttl)) == -1)
806                 {
807                         char errbuf[1024];
808                         ERROR ("setsockopt: %s",
809                                         sstrerror (errno, errbuf, sizeof (errbuf)));
810                         return (-1);
811                 }
812         }
813         else if (ai->ai_family == AF_INET6)
814         {
815                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
816                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
817                 int optname;
818
819                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
820                         optname = IPV6_MULTICAST_HOPS;
821                 else
822                         optname = IPV6_UNICAST_HOPS;
823
824                 if (setsockopt (se->fd, IPPROTO_IPV6, optname,
825                                         &network_config_ttl,
826                                         sizeof (network_config_ttl)) == -1)
827                 {
828                         char errbuf[1024];
829                         ERROR ("setsockopt: %s",
830                                         sstrerror (errno, errbuf,
831                                                 sizeof (errbuf)));
832                         return (-1);
833                 }
834         }
835
836         return (0);
837 } /* int network_set_ttl */
838
839 static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
840 {
841         int loop = 0;
842         int yes  = 1;
843
844         /* allow multiple sockets to use the same PORT number */
845         if (setsockopt(se->fd, SOL_SOCKET, SO_REUSEADDR,
846                                 &yes, sizeof(yes)) == -1) {
847                 char errbuf[1024];
848                 ERROR ("setsockopt: %s", 
849                                 sstrerror (errno, errbuf, sizeof (errbuf)));
850                 return (-1);
851         }
852
853         DEBUG ("fd = %i; calling `bind'", se->fd);
854
855         if (bind (se->fd, ai->ai_addr, ai->ai_addrlen) == -1)
856         {
857                 char errbuf[1024];
858                 ERROR ("bind: %s",
859                                 sstrerror (errno, errbuf, sizeof (errbuf)));
860                 return (-1);
861         }
862
863         if (ai->ai_family == AF_INET)
864         {
865                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
866                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
867                 {
868                         struct ip_mreq mreq;
869
870                         DEBUG ("fd = %i; IPv4 multicast address found", se->fd);
871
872                         mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
873                         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
874
875                         if (setsockopt (se->fd, IPPROTO_IP, IP_MULTICAST_LOOP,
876                                                 &loop, sizeof (loop)) == -1)
877                         {
878                                 char errbuf[1024];
879                                 ERROR ("setsockopt: %s",
880                                                 sstrerror (errno, errbuf,
881                                                         sizeof (errbuf)));
882                                 return (-1);
883                         }
884
885                         if (setsockopt (se->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
886                                                 &mreq, sizeof (mreq)) == -1)
887                         {
888                                 char errbuf[1024];
889                                 ERROR ("setsockopt: %s",
890                                                 sstrerror (errno, errbuf,
891                                                         sizeof (errbuf)));
892                                 return (-1);
893                         }
894                 }
895         }
896         else if (ai->ai_family == AF_INET6)
897         {
898                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
899                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
900                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
901                 {
902                         struct ipv6_mreq mreq;
903
904                         DEBUG ("fd = %i; IPv6 multicast address found", se->fd);
905
906                         memcpy (&mreq.ipv6mr_multiaddr,
907                                         &addr->sin6_addr,
908                                         sizeof (addr->sin6_addr));
909
910                         /* http://developer.apple.com/documentation/Darwin/Reference/ManPages/man4/ip6.4.html
911                          * ipv6mr_interface may be set to zeroes to
912                          * choose the default multicast interface or to
913                          * the index of a particular multicast-capable
914                          * interface if the host is multihomed.
915                          * Membership is associ-associated with a
916                          * single interface; programs running on
917                          * multihomed hosts may need to join the same
918                          * group on more than one interface.*/
919                         mreq.ipv6mr_interface = 0;
920
921                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
922                                                 &loop, sizeof (loop)) == -1)
923                         {
924                                 char errbuf[1024];
925                                 ERROR ("setsockopt: %s",
926                                                 sstrerror (errno, errbuf,
927                                                         sizeof (errbuf)));
928                                 return (-1);
929                         }
930
931                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
932                                                 &mreq, sizeof (mreq)) == -1)
933                         {
934                                 char errbuf[1024];
935                                 ERROR ("setsockopt: %s",
936                                                 sstrerror (errno, errbuf,
937                                                         sizeof (errbuf)));
938                                 return (-1);
939                         }
940                 }
941         }
942
943         return (0);
944 } /* int network_bind_socket */
945
946 static sockent_t *network_create_socket (const char *node,
947                 const char *service,
948                 int listen)
949 {
950         struct addrinfo  ai_hints;
951         struct addrinfo *ai_list, *ai_ptr;
952         int              ai_return;
953
954         sockent_t *se_head = NULL;
955         sockent_t *se_tail = NULL;
956
957         DEBUG ("node = %s, service = %s", node, service);
958
959         memset (&ai_hints, '\0', sizeof (ai_hints));
960         ai_hints.ai_flags    = 0;
961 #ifdef AI_PASSIVE
962         ai_hints.ai_flags |= AI_PASSIVE;
963 #endif
964 #ifdef AI_ADDRCONFIG
965         ai_hints.ai_flags |= AI_ADDRCONFIG;
966 #endif
967         ai_hints.ai_family   = AF_UNSPEC;
968         ai_hints.ai_socktype = SOCK_DGRAM;
969         ai_hints.ai_protocol = IPPROTO_UDP;
970
971         ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
972         if (ai_return != 0)
973         {
974                 char errbuf[1024];
975                 ERROR ("getaddrinfo (%s, %s): %s",
976                                 (node == NULL) ? "(null)" : node,
977                                 (service == NULL) ? "(null)" : service,
978                                 (ai_return == EAI_SYSTEM)
979                                 ? sstrerror (errno, errbuf, sizeof (errbuf))
980                                 : gai_strerror (ai_return));
981                 return (NULL);
982         }
983
984         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
985         {
986                 sockent_t *se;
987
988                 if ((se = (sockent_t *) malloc (sizeof (sockent_t))) == NULL)
989                 {
990                         char errbuf[1024];
991                         ERROR ("malloc: %s",
992                                         sstrerror (errno, errbuf,
993                                                 sizeof (errbuf)));
994                         continue;
995                 }
996
997                 if ((se->addr = (struct sockaddr_storage *) malloc (sizeof (struct sockaddr_storage))) == NULL)
998                 {
999                         char errbuf[1024];
1000                         ERROR ("malloc: %s",
1001                                         sstrerror (errno, errbuf,
1002                                                 sizeof (errbuf)));
1003                         free (se);
1004                         continue;
1005                 }
1006
1007                 assert (sizeof (struct sockaddr_storage) >= ai_ptr->ai_addrlen);
1008                 memset (se->addr, '\0', sizeof (struct sockaddr_storage));
1009                 memcpy (se->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1010                 se->addrlen = ai_ptr->ai_addrlen;
1011
1012                 se->fd   = socket (ai_ptr->ai_family,
1013                                 ai_ptr->ai_socktype,
1014                                 ai_ptr->ai_protocol);
1015                 se->next = NULL;
1016
1017                 if (se->fd == -1)
1018                 {
1019                         char errbuf[1024];
1020                         ERROR ("socket: %s",
1021                                         sstrerror (errno, errbuf,
1022                                                 sizeof (errbuf)));
1023                         free (se->addr);
1024                         free (se);
1025                         continue;
1026                 }
1027
1028                 if (listen != 0)
1029                 {
1030                         if (network_bind_socket (se, ai_ptr) != 0)
1031                         {
1032                                 close (se->fd);
1033                                 free (se->addr);
1034                                 free (se);
1035                                 continue;
1036                         }
1037                 }
1038                 else /* listen == 0 */
1039                 {
1040                         network_set_ttl (se, ai_ptr);
1041                 }
1042
1043                 if (se_tail == NULL)
1044                 {
1045                         se_head = se;
1046                         se_tail = se;
1047                 }
1048                 else
1049                 {
1050                         se_tail->next = se;
1051                         se_tail = se;
1052                 }
1053
1054                 /* We don't open more than one write-socket per node/service pair.. */
1055                 if (listen == 0)
1056                         break;
1057         }
1058
1059         freeaddrinfo (ai_list);
1060
1061         return (se_head);
1062 } /* sockent_t *network_create_socket */
1063
1064 static sockent_t *network_create_default_socket (int listen)
1065 {
1066         sockent_t *se_ptr  = NULL;
1067         sockent_t *se_head = NULL;
1068         sockent_t *se_tail = NULL;
1069
1070         se_ptr = network_create_socket (NET_DEFAULT_V6_ADDR,
1071                         NET_DEFAULT_PORT, listen);
1072
1073         /* Don't send to the same machine in IPv6 and IPv4 if both are available. */
1074         if ((listen == 0) && (se_ptr != NULL))
1075                 return (se_ptr);
1076
1077         if (se_ptr != NULL)
1078         {
1079                 se_head = se_ptr;
1080                 se_tail = se_ptr;
1081                 while (se_tail->next != NULL)
1082                         se_tail = se_tail->next;
1083         }
1084
1085         se_ptr = network_create_socket (NET_DEFAULT_V4_ADDR, NET_DEFAULT_PORT, listen);
1086
1087         if (se_tail == NULL)
1088                 return (se_ptr);
1089
1090         se_tail->next = se_ptr;
1091         return (se_head);
1092 } /* sockent_t *network_create_default_socket */
1093
1094 static int network_add_listen_socket (const char *node, const char *service)
1095 {
1096         sockent_t *se;
1097         sockent_t *se_ptr;
1098         int se_num = 0;
1099
1100         if (service == NULL)
1101                 service = NET_DEFAULT_PORT;
1102
1103         if (node == NULL)
1104                 se = network_create_default_socket (1 /* listen == true */);
1105         else
1106                 se = network_create_socket (node, service, 1 /* listen == true */);
1107
1108         if (se == NULL)
1109                 return (-1);
1110
1111         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
1112                 se_num++;
1113
1114         listen_sockets = (struct pollfd *) realloc (listen_sockets,
1115                         (listen_sockets_num + se_num)
1116                         * sizeof (struct pollfd));
1117
1118         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
1119         {
1120                 listen_sockets[listen_sockets_num].fd = se_ptr->fd;
1121                 listen_sockets[listen_sockets_num].events = POLLIN | POLLPRI;
1122                 listen_sockets[listen_sockets_num].revents = 0;
1123                 listen_sockets_num++;
1124         } /* for (se) */
1125
1126         free_sockent (se);
1127         return (0);
1128 } /* int network_add_listen_socket */
1129
1130 static int network_add_sending_socket (const char *node, const char *service)
1131 {
1132         sockent_t *se;
1133         sockent_t *se_ptr;
1134
1135         if (service == NULL)
1136                 service = NET_DEFAULT_PORT;
1137
1138         if (node == NULL)
1139                 se = network_create_default_socket (0 /* listen == false */);
1140         else
1141                 se = network_create_socket (node, service, 0 /* listen == false */);
1142
1143         if (se == NULL)
1144                 return (-1);
1145
1146         if (sending_sockets == NULL)
1147         {
1148                 sending_sockets = se;
1149                 return (0);
1150         }
1151
1152         for (se_ptr = sending_sockets; se_ptr->next != NULL; se_ptr = se_ptr->next)
1153                 /* seek end */;
1154
1155         se_ptr->next = se;
1156         return (0);
1157 } /* int network_get_listen_socket */
1158
1159 static void *dispatch_thread (void *arg)
1160 {
1161   while (42)
1162   {
1163     receive_list_entry_t *ent;
1164
1165     /* Lock and wait for more data to come in */
1166     pthread_mutex_lock (&receive_list_lock);
1167     while ((listen_loop == 0)
1168         && (receive_list_head == NULL))
1169       pthread_cond_wait (&receive_list_cond, &receive_list_lock);
1170
1171     /* Remove the head entry and unlock */
1172     ent = receive_list_head;
1173     if (ent != NULL)
1174       receive_list_head = ent->next;
1175     pthread_mutex_unlock (&receive_list_lock);
1176
1177     /* Check whether we are supposed to exit. We do NOT check `listen_loop'
1178      * because we dispatch all missing packets before shutting down. */
1179     if (ent == NULL)
1180       break;
1181
1182     parse_packet (ent->data, ent->data_len);
1183
1184     sfree (ent);
1185   } /* while (42) */
1186
1187   return (NULL);
1188 } /* void *receive_thread */
1189
1190 static int network_receive (void)
1191 {
1192         char buffer[BUFF_SIZE];
1193         int  buffer_len;
1194
1195         int i;
1196         int status;
1197
1198         if (listen_sockets_num == 0)
1199                 network_add_listen_socket (NULL, NULL);
1200
1201         if (listen_sockets_num == 0)
1202         {
1203                 ERROR ("network: Failed to open a listening socket.");
1204                 return (-1);
1205         }
1206
1207         while (listen_loop == 0)
1208         {
1209                 status = poll (listen_sockets, listen_sockets_num, -1);
1210
1211                 if (status <= 0)
1212                 {
1213                         char errbuf[1024];
1214                         if (errno == EINTR)
1215                                 continue;
1216                         ERROR ("poll failed: %s",
1217                                         sstrerror (errno, errbuf, sizeof (errbuf)));
1218                         return (-1);
1219                 }
1220
1221                 for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
1222                 {
1223                         receive_list_entry_t *ent;
1224
1225                         if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
1226                                 continue;
1227                         status--;
1228
1229                         buffer_len = recv (listen_sockets[i].fd,
1230                                         buffer, sizeof (buffer),
1231                                         0 /* no flags */);
1232                         if (buffer_len < 0)
1233                         {
1234                                 char errbuf[1024];
1235                                 ERROR ("recv failed: %s",
1236                                                 sstrerror (errno, errbuf,
1237                                                         sizeof (errbuf)));
1238                                 return (-1);
1239                         }
1240
1241                         ent = malloc (sizeof (receive_list_entry_t));
1242                         if (ent == NULL)
1243                         {
1244                                 ERROR ("network plugin: malloc failed.");
1245                                 return (-1);
1246                         }
1247                         memset (ent, '\0', sizeof (receive_list_entry_t));
1248
1249                         /* Hopefully this be optimized out by the compiler. It
1250                          * might help prevent stupid bugs in the future though.
1251                          */
1252                         assert (sizeof (ent->data) == sizeof (buffer));
1253
1254                         memcpy (ent->data, buffer, buffer_len);
1255                         ent->data_len = buffer_len;
1256
1257                         pthread_mutex_lock (&receive_list_lock);
1258                         if (receive_list_head == NULL)
1259                         {
1260                                 receive_list_head = ent;
1261                                 receive_list_tail = ent;
1262                         }
1263                         else
1264                         {
1265                                 receive_list_tail->next = ent;
1266                                 receive_list_tail = ent;
1267                         }
1268                         pthread_cond_signal (&receive_list_cond);
1269                         pthread_mutex_unlock (&receive_list_lock);
1270                 } /* for (listen_sockets) */
1271         } /* while (listen_loop == 0) */
1272
1273         return (0);
1274 }
1275
1276 static void *receive_thread (void *arg)
1277 {
1278         return (network_receive () ? (void *) 1 : (void *) 0);
1279 } /* void *receive_thread */
1280
1281 static void network_send_buffer (const char *buffer, int buffer_len)
1282 {
1283         sockent_t *se;
1284         int status;
1285
1286         DEBUG ("network plugin: network_send_buffer: buffer_len = %i", buffer_len);
1287
1288         for (se = sending_sockets; se != NULL; se = se->next)
1289         {
1290                 while (42)
1291                 {
1292                         status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */,
1293                                         (struct sockaddr *) se->addr, se->addrlen);
1294                         if (status < 0)
1295                         {
1296                                 char errbuf[1024];
1297                                 if (errno == EINTR)
1298                                         continue;
1299                                 ERROR ("network plugin: sendto failed: %s",
1300                                                 sstrerror (errno, errbuf,
1301                                                         sizeof (errbuf)));
1302                                 break;
1303                         }
1304
1305                         break;
1306                 } /* while (42) */
1307         } /* for (sending_sockets) */
1308 } /* void network_send_buffer */
1309
1310 static int add_to_buffer (char *buffer, int buffer_size,
1311                 value_list_t *vl_def, char *type_def,
1312                 const data_set_t *ds, const value_list_t *vl)
1313 {
1314         char *buffer_orig = buffer;
1315
1316         if (strcmp (vl_def->host, vl->host) != 0)
1317         {
1318                 if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
1319                                         vl->host, strlen (vl->host)) != 0)
1320                         return (-1);
1321                 strcpy (vl_def->host, vl->host);
1322         }
1323
1324         if (vl_def->time != vl->time)
1325         {
1326                 if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
1327                                         (uint64_t) vl->time))
1328                         return (-1);
1329                 vl_def->time = vl->time;
1330         }
1331
1332         if (vl_def->interval != vl->interval)
1333         {
1334                 if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
1335                                         (uint64_t) vl->interval))
1336                         return (-1);
1337                 vl_def->interval = vl->interval;
1338         }
1339
1340         if (strcmp (vl_def->plugin, vl->plugin) != 0)
1341         {
1342                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
1343                                         vl->plugin, strlen (vl->plugin)) != 0)
1344                         return (-1);
1345                 strcpy (vl_def->plugin, vl->plugin);
1346         }
1347
1348         if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
1349         {
1350                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
1351                                         vl->plugin_instance,
1352                                         strlen (vl->plugin_instance)) != 0)
1353                         return (-1);
1354                 strcpy (vl_def->plugin_instance, vl->plugin_instance);
1355         }
1356
1357         if (strcmp (type_def, ds->type) != 0)
1358         {
1359                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
1360                                         ds->type, strlen (ds->type)) != 0)
1361                         return (-1);
1362                 strcpy (type_def, ds->type);
1363         }
1364
1365         if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
1366         {
1367                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
1368                                         vl->type_instance,
1369                                         strlen (vl->type_instance)) != 0)
1370                         return (-1);
1371                 strcpy (vl_def->type_instance, vl->type_instance);
1372         }
1373         
1374         if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
1375                 return (-1);
1376
1377         return (buffer - buffer_orig);
1378 } /* int add_to_buffer */
1379
1380 static void flush_buffer (void)
1381 {
1382         DEBUG ("network plugin: flush_buffer: send_buffer_fill = %i",
1383                         send_buffer_fill);
1384
1385         network_send_buffer (send_buffer, send_buffer_fill);
1386         send_buffer_ptr  = send_buffer;
1387         send_buffer_fill = 0;
1388         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1389         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1390 }
1391
1392 static int network_write (const data_set_t *ds, const value_list_t *vl)
1393 {
1394         int status;
1395
1396         /* If the value is already in the cache, we have received it via the
1397          * network. We write it again if forwarding is activated. It's then in
1398          * the cache and should we receive it again we will ignore it. */
1399         status = cache_check (ds->type, vl);
1400         if ((network_config_forward == 0)
1401                         && (status != 0))
1402                 return (0);
1403
1404         pthread_mutex_lock (&send_buffer_lock);
1405
1406         status = add_to_buffer (send_buffer_ptr,
1407                         sizeof (send_buffer) - send_buffer_fill,
1408                         &send_buffer_vl, send_buffer_type,
1409                         ds, vl);
1410         if (status >= 0)
1411         {
1412                 /* status == bytes added to the buffer */
1413                 send_buffer_fill += status;
1414                 send_buffer_ptr  += status;
1415         }
1416         else
1417         {
1418                 flush_buffer ();
1419
1420                 status = add_to_buffer (send_buffer_ptr,
1421                                 sizeof (send_buffer) - send_buffer_fill,
1422                                 &send_buffer_vl, send_buffer_type,
1423                                 ds, vl);
1424
1425                 if (status >= 0)
1426                 {
1427                         send_buffer_fill += status;
1428                         send_buffer_ptr  += status;
1429                 }
1430         }
1431
1432         if (status < 0)
1433         {
1434                 ERROR ("network plugin: Unable to append to the "
1435                                 "buffer for some weird reason");
1436         }
1437         else if ((sizeof (send_buffer) - send_buffer_fill) < 15)
1438         {
1439                 flush_buffer ();
1440         }
1441
1442         pthread_mutex_unlock (&send_buffer_lock);
1443
1444         return ((status < 0) ? -1 : 0);
1445 } /* int network_write */
1446
1447 static int network_config (const char *key, const char *val)
1448 {
1449         char *node;
1450         char *service;
1451
1452         char *fields[3];
1453         int   fields_num;
1454
1455         if ((strcasecmp ("Listen", key) == 0)
1456                         || (strcasecmp ("Server", key) == 0))
1457         {
1458                 char *val_cpy = strdup (val);
1459                 if (val_cpy == NULL)
1460                         return (1);
1461
1462                 service = NET_DEFAULT_PORT;
1463                 fields_num = strsplit (val_cpy, fields, 3);
1464                 if ((fields_num != 1)
1465                                 && (fields_num != 2))
1466                         return (1);
1467                 else if (fields_num == 2)
1468                 {
1469                         if ((service = strchr (fields[1], '.')) != NULL)
1470                                 *service = '\0';
1471                         service = fields[1];
1472                 }
1473                 node = fields[0];
1474
1475                 if (strcasecmp ("Listen", key) == 0)
1476                         network_add_listen_socket (node, service);
1477                 else
1478                         network_add_sending_socket (node, service);
1479         }
1480         else if (strcasecmp ("TimeToLive", key) == 0)
1481         {
1482                 int tmp = atoi (val);
1483                 if ((tmp > 0) && (tmp < 256))
1484                         network_config_ttl = tmp;
1485                 else
1486                         return (1);
1487         }
1488         else if (strcasecmp ("Forward", key) == 0)
1489         {
1490                 if ((strcasecmp ("true", val) == 0)
1491                                 || (strcasecmp ("yes", val) == 0)
1492                                 || (strcasecmp ("on", val) == 0))
1493                         network_config_forward = 1;
1494                 else
1495                         network_config_forward = 0;
1496         }
1497         else if (strcasecmp ("CacheFlush", key) == 0)
1498         {
1499                 int tmp = atoi (val);
1500                 if (tmp > 0)
1501                         cache_flush_interval = tmp;
1502                 else return (1);
1503         }
1504         else
1505         {
1506                 return (-1);
1507         }
1508         return (0);
1509 } /* int network_config */
1510
1511 static int network_notification (const notification_t *n)
1512 {
1513   char  buffer[BUFF_SIZE];
1514   char *buffer_ptr = buffer;
1515   int   buffer_free = sizeof (buffer);
1516   int   status;
1517
1518   memset (buffer, '\0', sizeof (buffer));
1519
1520
1521   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME,
1522       (uint64_t) n->time);
1523   if (status != 0)
1524     return (-1);
1525
1526   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY,
1527       (uint64_t) n->severity);
1528   if (status != 0)
1529     return (-1);
1530
1531   if (strlen (n->host) > 0)
1532   {
1533     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST,
1534         n->host, strlen (n->host));
1535     if (status != 0)
1536       return (-1);
1537   }
1538
1539   if (strlen (n->plugin) > 0)
1540   {
1541     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN,
1542         n->plugin, strlen (n->plugin));
1543     if (status != 0)
1544       return (-1);
1545   }
1546
1547   if (strlen (n->plugin_instance) > 0)
1548   {
1549     status = write_part_string (&buffer_ptr, &buffer_free,
1550         TYPE_PLUGIN_INSTANCE,
1551         n->plugin_instance, strlen (n->plugin_instance));
1552     if (status != 0)
1553       return (-1);
1554   }
1555
1556   if (strlen (n->type) > 0)
1557   {
1558     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE,
1559         n->type, strlen (n->type));
1560     if (status != 0)
1561       return (-1);
1562   }
1563
1564   if (strlen (n->type_instance) > 0)
1565   {
1566     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE,
1567         n->type_instance, strlen (n->type_instance));
1568     if (status != 0)
1569       return (-1);
1570   }
1571
1572   status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE,
1573       n->message, strlen (n->message));
1574   if (status != 0)
1575     return (-1);
1576
1577   network_send_buffer (buffer, sizeof (buffer) - buffer_free);
1578
1579   return (0);
1580 } /* int network_notification */
1581
1582 static int network_shutdown (void)
1583 {
1584         listen_loop++;
1585
1586         /* Kill the listening thread */
1587         if (receive_thread_id != (pthread_t) 0)
1588         {
1589                 pthread_kill (receive_thread_id, SIGTERM);
1590                 pthread_join (receive_thread_id, NULL /* no return value */);
1591                 receive_thread_id = (pthread_t) 0;
1592         }
1593
1594         /* Shutdown the dispatching thread */
1595         if (dispatch_thread_id != (pthread_t) 0)
1596                 pthread_cond_broadcast (&receive_list_cond);
1597
1598         if (send_buffer_fill > 0)
1599                 flush_buffer ();
1600
1601         if (cache_tree != NULL)
1602         {
1603                 void *key;
1604                 void *value;
1605
1606                 while (c_avl_pick (cache_tree, &key, &value) == 0)
1607                 {
1608                         sfree (key);
1609                         sfree (value);
1610                 }
1611                 c_avl_destroy (cache_tree);
1612                 cache_tree = NULL;
1613         }
1614
1615         /* TODO: Close `sending_sockets' */
1616
1617         plugin_unregister_config ("network");
1618         plugin_unregister_init ("network");
1619         plugin_unregister_write ("network");
1620         plugin_unregister_shutdown ("network");
1621
1622         return (0);
1623 } /* int network_shutdown */
1624
1625 static int network_init (void)
1626 {
1627         plugin_register_shutdown ("network", network_shutdown);
1628
1629         send_buffer_ptr  = send_buffer;
1630         send_buffer_fill = 0;
1631         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1632         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1633
1634         cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1635         cache_flush_last = time (NULL);
1636
1637         /* setup socket(s) and so on */
1638         if (sending_sockets != NULL)
1639         {
1640                 plugin_register_write ("network", network_write);
1641                 plugin_register_notification ("network", network_notification);
1642         }
1643
1644         if ((listen_sockets_num != 0) && (receive_thread_id == 0))
1645         {
1646                 int status;
1647
1648                 status = pthread_create (&dispatch_thread_id,
1649                                 NULL /* no attributes */,
1650                                 dispatch_thread,
1651                                 NULL /* no argument */);
1652                 if (status != 0)
1653                 {
1654                         char errbuf[1024];
1655                         ERROR ("network: pthread_create failed: %s",
1656                                         sstrerror (errno, errbuf,
1657                                                 sizeof (errbuf)));
1658                 }
1659
1660                 status = pthread_create (&receive_thread_id,
1661                                 NULL /* no attributes */,
1662                                 receive_thread,
1663                                 NULL /* no argument */);
1664                 if (status != 0)
1665                 {
1666                         char errbuf[1024];
1667                         ERROR ("network: pthread_create failed: %s",
1668                                         sstrerror (errno, errbuf,
1669                                                 sizeof (errbuf)));
1670                 }
1671         }
1672         return (0);
1673 } /* int network_init */
1674
1675 static int network_flush (int timeout)
1676 {
1677         pthread_mutex_lock (&send_buffer_lock);
1678
1679         if (((time (NULL) - cache_flush_last) >= timeout)
1680                         && (send_buffer_fill > 0))
1681         {
1682                 flush_buffer ();
1683         }
1684
1685         pthread_mutex_unlock (&send_buffer_lock);
1686
1687         return (0);
1688 } /* int network_flush */
1689
1690 void module_register (void)
1691 {
1692         plugin_register_config ("network", network_config,
1693                         config_keys, config_keys_num);
1694         plugin_register_init   ("network", network_init);
1695         plugin_register_flush   ("network", network_flush);
1696 } /* void module_register */