6c6e0f60a69fc1478efab4ba46a3cdb03f0971b1
[collectd.git] / src / network.c
1 /**
2  * collectd - src/network.c
3  * Copyright (C) 2005-2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26
27 #include "network.h"
28
29 #if HAVE_PTHREAD_H
30 # include <pthread.h>
31 #endif
32 #if HAVE_SYS_SOCKET_H
33 # include <sys/socket.h>
34 #endif
35 #if HAVE_NETDB_H
36 # include <netdb.h>
37 #endif
38 #if HAVE_NETINET_IN_H
39 # include <netinet/in.h>
40 #endif
41 #if HAVE_ARPA_INET_H
42 # include <arpa/inet.h>
43 #endif
44 #if HAVE_POLL_H
45 # include <poll.h>
46 #endif
47
48 /* 1500 - 40 - 8  =  Ethernet packet - IPv6 header - UDP header */
49 /* #define BUFF_SIZE 1452 */
50
51 #ifndef IPV6_ADD_MEMBERSHIP
52 # ifdef IPV6_JOIN_GROUP
53 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
54 # else
55 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
56 # endif
57 #endif /* !IP_ADD_MEMBERSHIP */
58
59 #define BUFF_SIZE 1024
60
61 /*
62  * Private data types
63  */
64 typedef struct sockent
65 {
66         int                      fd;
67         struct sockaddr_storage *addr;
68         socklen_t                addrlen;
69         struct sockent          *next;
70 } sockent_t;
71
72 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
73  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
74  * +-------+-----------------------+-------------------------------+
75  * ! Ver.  !                       ! Length                        !
76  * +-------+-----------------------+-------------------------------+
77  */
78 struct part_header_s
79 {
80         uint16_t type;
81         uint16_t length;
82 };
83 typedef struct part_header_s part_header_t;
84
85 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
86  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
87  * +-------------------------------+-------------------------------+
88  * ! Type                          ! Length                        !
89  * +-------------------------------+-------------------------------+
90  * : (Length - 4) Bytes                                            :
91  * +---------------------------------------------------------------+
92  */
93 struct part_string_s
94 {
95         part_header_t *head;
96         char *value;
97 };
98 typedef struct part_string_s part_string_t;
99
100 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
101  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
102  * +-------------------------------+-------------------------------+
103  * ! Type                          ! Length                        !
104  * +-------------------------------+-------------------------------+
105  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
106  * +---------------------------------------------------------------+
107  */
108 struct part_number_s
109 {
110         part_header_t *head;
111         uint64_t *value;
112 };
113 typedef struct part_number_s part_number_t;
114
115 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
116  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
117  * +-------------------------------+-------------------------------+
118  * ! Type                          ! Length                        !
119  * +-------------------------------+---------------+---------------+
120  * ! Num of values                 ! Type0         ! Type1         !
121  * +-------------------------------+---------------+---------------+
122  * ! Value0                                                        !
123  * !                                                               !
124  * +---------------------------------------------------------------+
125  * ! Value1                                                        !
126  * !                                                               !
127  * +---------------------------------------------------------------+
128  */
129 struct part_values_s
130 {
131         part_header_t *head;
132         uint16_t *num_values;
133         uint8_t  *values_types;
134         value_t  *values;
135 };
136 typedef struct part_values_s part_values_t;
137
138 /*
139  * Private variables
140  */
141 static const char *config_keys[] =
142 {
143         "Listen",
144         "Server",
145         "TimeToLive",
146         NULL
147 };
148 static int config_keys_num = 3;
149
150 static int network_config_ttl = 0;
151
152 static sockent_t *sending_sockets = NULL;
153
154 static struct pollfd *listen_sockets = NULL;
155 static int listen_sockets_num = 0;
156 static pthread_t listen_thread = 0;
157 static int listen_loop = 0;
158
159 static char         send_buffer[BUFF_SIZE];
160 static char        *send_buffer_ptr;
161 static int          send_buffer_fill;
162 static value_list_t send_buffer_vl = VALUE_LIST_INIT;
163 static char         send_buffer_type[DATA_MAX_NAME_LEN];
164
165 /*
166  * Private functions
167  */
168 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
169                 const data_set_t *ds, const value_list_t *vl)
170 {
171         part_values_t pv;
172         int i;
173
174         i = 6 + (9 * vl->values_len);
175         if (*ret_buffer_len < i)
176                 return (-1);
177         *ret_buffer_len -= i;
178
179         pv.head = (part_header_t *) *ret_buffer;
180         pv.num_values = (uint16_t *) (pv.head + 1);
181         pv.values_types = (uint8_t *) (pv.num_values + 1);
182         pv.values = (value_t *) (pv.values_types + vl->values_len);
183         *ret_buffer = (void *) (pv.values + vl->values_len);
184
185         pv.head->type = htons (TYPE_VALUES);
186         pv.head->length = htons (6 + (9 * vl->values_len));
187         *pv.num_values = htons ((uint16_t) vl->values_len);
188         
189         for (i = 0; i < vl->values_len; i++)
190         {
191                 if (ds->ds[i].type == DS_TYPE_COUNTER)
192                 {
193                         pv.values_types[i] = DS_TYPE_COUNTER;
194                         pv.values[i].counter = htonll (vl->values[i].counter);
195                 }
196                 else
197                 {
198                         pv.values_types[i] = DS_TYPE_GAUGE;
199                         pv.values[i].gauge = vl->values[i].gauge;
200                 }
201         } /* for (values) */
202
203         return (0);
204 } /* int write_part_values */
205
206 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
207                 int type, uint64_t value)
208 {
209         part_number_t pn;
210
211         if (*ret_buffer_len < 12)
212                 return (-1);
213
214         pn.head = (part_header_t *) *ret_buffer;
215         pn.value = (uint64_t *) (pn.head + 1);
216
217         pn.head->type = htons (type);
218         pn.head->length = htons (12);
219         *pn.value = htonll (value);
220
221         *ret_buffer = (char *) (pn.value + 1);
222         *ret_buffer_len -= 12;
223
224         return (0);
225 } /* int write_part_number */
226
227 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
228                 int type, const char *str, int str_len)
229 {
230         part_string_t ps;
231         int len;
232
233         len = 4 + str_len + 1;
234         if (*ret_buffer_len < len)
235                 return (-1);
236         *ret_buffer_len -= len;
237
238         ps.head = (part_header_t *) *ret_buffer;
239         ps.value = (char *) (ps.head + 1);
240
241         ps.head->type = htons ((uint16_t) type);
242         ps.head->length = htons ((uint16_t) str_len + 5);
243         if (str_len > 0)
244                 memcpy (ps.value, str, str_len);
245         ps.value[str_len] = '\0';
246         *ret_buffer = (void *) (ps.value + (str_len + 1));
247
248         return (0);
249 } /* int write_part_string */
250
251 static int parse_part_values (void **ret_buffer, int *ret_buffer_len,
252                 value_t **ret_values, int *ret_num_values)
253 {
254         char *buffer = *ret_buffer;
255         int   buffer_len = *ret_buffer_len;
256         part_values_t pv;
257         int   i;
258
259         uint16_t h_length;
260         uint16_t h_type;
261         uint16_t h_num;
262
263         if (buffer_len < (15))
264         {
265                 DEBUG ("packet is too short: buffer_len = %i", buffer_len);
266                 return (-1);
267         }
268
269         pv.head = (part_header_t *) buffer;
270         h_length = ntohs (pv.head->length);
271         h_type = ntohs (pv.head->type);
272
273         assert (h_type == TYPE_VALUES);
274
275         pv.num_values = (uint16_t *) (pv.head + 1);
276         h_num = ntohs (*pv.num_values);
277
278         if (h_num != ((h_length - 6) / 9))
279         {
280                 DEBUG ("`length' and `num of values' don't match");
281                 return (-1);
282         }
283
284         pv.values_types = (uint8_t *) (pv.num_values + 1);
285         pv.values = (value_t *) (pv.values_types + h_num);
286
287         for (i = 0; i < h_num; i++)
288                 if (pv.values_types[i] == DS_TYPE_COUNTER)
289                         pv.values[i].counter = ntohll (pv.values[i].counter);
290
291         *ret_buffer     = (void *) (pv.values + h_num);
292         *ret_buffer_len = buffer_len - h_length;
293         *ret_num_values = h_num;
294         *ret_values     = pv.values;
295
296         return (0);
297 } /* int parse_part_values */
298
299 static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
300                 uint64_t *value)
301 {
302         part_number_t pn;
303         uint16_t len;
304
305         pn.head = (part_header_t *) *ret_buffer;
306         pn.value = (uint64_t *) (pn.head + 1);
307
308         len = ntohs (pn.head->length);
309         if (len != 12)
310                 return (-1);
311         if (len > *ret_buffer_len)
312                 return (-1);
313         *value = ntohll (*pn.value);
314
315         *ret_buffer = (void *) (pn.value + 1);
316         *ret_buffer_len -= len;
317
318         return (0);
319 } /* int parse_part_number */
320
321 static int parse_part_string (void **ret_buffer, int *ret_buffer_len,
322                 char *output, int output_len)
323 {
324         char *buffer = *ret_buffer;
325         int   buffer_len = *ret_buffer_len;
326         part_string_t ps;
327
328         uint16_t h_length;
329         uint16_t h_type;
330
331         DEBUG ("ret_buffer = %p; ret_buffer_len = %i; output = %p; output_len = %i;",
332                         *ret_buffer, *ret_buffer_len,
333                         (void *) output, output_len);
334
335         ps.head = (part_header_t *) buffer;
336
337         h_length = ntohs (ps.head->length);
338         h_type = ntohs (ps.head->type);
339
340         DEBUG ("length = %hu; type = %hu;", h_length, h_type);
341
342         if (buffer_len < h_length)
343         {
344                 DEBUG ("packet is too short");
345                 return (-1);
346         }
347         assert ((h_type == TYPE_HOST)
348                         || (h_type == TYPE_PLUGIN)
349                         || (h_type == TYPE_PLUGIN_INSTANCE)
350                         || (h_type == TYPE_TYPE)
351                         || (h_type == TYPE_TYPE_INSTANCE));
352
353         ps.value = buffer + 4;
354         if (ps.value[h_length - 5] != '\0')
355         {
356                 DEBUG ("String does not end with a nullbyte");
357                 return (-1);
358         }
359
360         if (output_len < (h_length - 4))
361         {
362                 DEBUG ("output buffer is too small");
363                 return (-1);
364         }
365         strcpy (output, ps.value);
366
367         DEBUG ("output = %s", output);
368
369         *ret_buffer = (void *) (buffer + h_length);
370         *ret_buffer_len = buffer_len - h_length;
371
372         return (0);
373 } /* int parse_part_string */
374
375 static int parse_packet (void *buffer, int buffer_len)
376 {
377         part_header_t *header;
378         int status;
379
380         value_list_t vl = VALUE_LIST_INIT;
381         char type[DATA_MAX_NAME_LEN];
382
383         DEBUG ("buffer = %p; buffer_len = %i;", buffer, buffer_len);
384
385         memset (&vl, '\0', sizeof (vl));
386         memset (&type, '\0', sizeof (type));
387         status = 0;
388
389         while ((status == 0) && (buffer_len > sizeof (part_header_t)))
390         {
391                 header = (part_header_t *) buffer;
392
393                 if (ntohs (header->length) > buffer_len)
394                         break;
395
396                 if (header->type == htons (TYPE_VALUES))
397                 {
398                         status = parse_part_values (&buffer, &buffer_len,
399                                         &vl.values, &vl.values_len);
400
401                         if (status != 0)
402                         {
403                                 DEBUG ("parse_part_values failed.");
404                                 break;
405                         }
406
407                         if ((vl.time > 0)
408                                         && (strlen (vl.host) > 0)
409                                         && (strlen (vl.plugin) > 0)
410                                         && (strlen (type) > 0))
411                         {
412                                 DEBUG ("dispatching values");
413                                 plugin_dispatch_values (type, &vl);
414                         }
415                         else
416                         {
417                                 DEBUG ("NOT dispatching values");
418                         }
419                 }
420                 else if (header->type == ntohs (TYPE_TIME))
421                 {
422                         uint64_t tmp = 0;
423                         status = parse_part_number (&buffer, &buffer_len, &tmp);
424                         if (status == 0)
425                                 vl.time = (time_t) tmp;
426                 }
427                 else if (header->type == ntohs (TYPE_HOST))
428                 {
429                         status = parse_part_string (&buffer, &buffer_len,
430                                         vl.host, sizeof (vl.host));
431                 }
432                 else if (header->type == ntohs (TYPE_PLUGIN))
433                 {
434                         status = parse_part_string (&buffer, &buffer_len,
435                                         vl.plugin, sizeof (vl.plugin));
436                 }
437                 else if (header->type == ntohs (TYPE_PLUGIN_INSTANCE))
438                 {
439                         status = parse_part_string (&buffer, &buffer_len,
440                                         vl.plugin_instance, sizeof (vl.plugin_instance));
441                 }
442                 else if (header->type == ntohs (TYPE_TYPE))
443                 {
444                         status = parse_part_string (&buffer, &buffer_len,
445                                         type, sizeof (type));
446                 }
447                 else if (header->type == ntohs (TYPE_TYPE_INSTANCE))
448                 {
449                         status = parse_part_string (&buffer, &buffer_len,
450                                         vl.type_instance, sizeof (vl.type_instance));
451                 }
452                 else
453                 {
454                         DEBUG ("Unknown part type: 0x%0hx", header->type);
455                         buffer = ((char *) buffer) + header->length;
456                 }
457         } /* while (buffer_len > sizeof (part_header_t)) */
458
459         return (0);
460 } /* int parse_packet */
461
462 static void free_sockent (sockent_t *se)
463 {
464         sockent_t *next;
465         while (se != NULL)
466         {
467                 next = se->next;
468                 free (se->addr);
469                 free (se);
470                 se = next;
471         }
472 } /* void free_sockent */
473
474 /*
475  * int network_set_ttl
476  *
477  * Set the `IP_MULTICAST_TTL', `IP_TTL', `IPV6_MULTICAST_HOPS' or
478  * `IPV6_UNICAST_HOPS', depending on which option is applicable.
479  *
480  * The `struct addrinfo' is used to destinguish between unicast and multicast
481  * sockets.
482  */
483 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
484 {
485         if ((network_config_ttl < 1) || (network_config_ttl > 255))
486                 return (-1);
487
488         DEBUG ("ttl = %i", network_config_ttl);
489
490         if (ai->ai_family == AF_INET)
491         {
492                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
493                 int optname;
494
495                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
496                         optname = IP_MULTICAST_TTL;
497                 else
498                         optname = IP_TTL;
499
500                 if (setsockopt (se->fd, IPPROTO_IP, optname,
501                                         &network_config_ttl,
502                                         sizeof (network_config_ttl)) == -1)
503                 {
504                         ERROR ("setsockopt: %s", strerror (errno));
505                         return (-1);
506                 }
507         }
508         else if (ai->ai_family == AF_INET6)
509         {
510                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
511                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
512                 int optname;
513
514                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
515                         optname = IPV6_MULTICAST_HOPS;
516                 else
517                         optname = IPV6_UNICAST_HOPS;
518
519                 if (setsockopt (se->fd, IPPROTO_IPV6, optname,
520                                         &network_config_ttl,
521                                         sizeof (network_config_ttl)) == -1)
522                 {
523                         ERROR ("setsockopt: %s", strerror (errno));
524                         return (-1);
525                 }
526         }
527
528         return (0);
529 } /* int network_set_ttl */
530
531 static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
532 {
533         int loop = 1;
534
535         DEBUG ("fd = %i; calling `bind'", se->fd);
536
537         if (bind (se->fd, ai->ai_addr, ai->ai_addrlen) == -1)
538         {
539                 ERROR ("bind: %s", strerror (errno));
540                 return (-1);
541         }
542
543         if (ai->ai_family == AF_INET)
544         {
545                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
546                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
547                 {
548                         struct ip_mreq mreq;
549
550                         DEBUG ("fd = %i; IPv4 multicast address found", se->fd);
551
552                         mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
553                         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
554
555                         if (setsockopt (se->fd, IPPROTO_IP, IP_MULTICAST_LOOP,
556                                                 &loop, sizeof (loop)) == -1)
557                         {
558                                 ERROR ("setsockopt: %s", strerror (errno));
559                                 return (-1);
560                         }
561
562                         if (setsockopt (se->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
563                                                 &mreq, sizeof (mreq)) == -1)
564                         {
565                                 ERROR ("setsockopt: %s", strerror (errno));
566                                 return (-1);
567                         }
568                 }
569         }
570         else if (ai->ai_family == AF_INET6)
571         {
572                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
573                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
574                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
575                 {
576                         struct ipv6_mreq mreq;
577
578                         DEBUG ("fd = %i; IPv6 multicast address found", se->fd);
579
580                         memcpy (&mreq.ipv6mr_multiaddr,
581                                         &addr->sin6_addr,
582                                         sizeof (addr->sin6_addr));
583
584                         /* http://developer.apple.com/documentation/Darwin/Reference/ManPages/man4/ip6.4.html
585                          * ipv6mr_interface may be set to zeroes to
586                          * choose the default multicast interface or to
587                          * the index of a particular multicast-capable
588                          * interface if the host is multihomed.
589                          * Membership is associ-associated with a
590                          * single interface; programs running on
591                          * multihomed hosts may need to join the same
592                          * group on more than one interface.*/
593                         mreq.ipv6mr_interface = 0;
594
595                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
596                                                 &loop, sizeof (loop)) == -1)
597                         {
598                                 ERROR ("setsockopt: %s", strerror (errno));
599                                 return (-1);
600                         }
601
602                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
603                                                 &mreq, sizeof (mreq)) == -1)
604                         {
605                                 ERROR ("setsockopt: %s", strerror (errno));
606                                 return (-1);
607                         }
608                 }
609         }
610
611         return (0);
612 } /* int network_bind_socket */
613
614 static sockent_t *network_create_socket (const char *node,
615                 const char *service,
616                 int listen)
617 {
618         struct addrinfo  ai_hints;
619         struct addrinfo *ai_list, *ai_ptr;
620         int              ai_return;
621
622         sockent_t *se_head = NULL;
623         sockent_t *se_tail = NULL;
624
625         DEBUG ("node = %s, service = %s", node, service);
626
627         memset (&ai_hints, '\0', sizeof (ai_hints));
628         ai_hints.ai_flags    = 0;
629 #ifdef AI_PASSIVE
630         ai_hints.ai_flags |= AI_PASSIVE;
631 #endif
632 #ifdef AI_ADDRCONFIG
633         ai_hints.ai_flags |= AI_ADDRCONFIG;
634 #endif
635         ai_hints.ai_family   = AF_UNSPEC;
636         ai_hints.ai_socktype = SOCK_DGRAM;
637         ai_hints.ai_protocol = IPPROTO_UDP;
638
639         ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
640         if (ai_return != 0)
641         {
642                 ERROR ("getaddrinfo (%s, %s): %s",
643                                 (node == NULL) ? "(null)" : node,
644                                 (service == NULL) ? "(null)" : service,
645                                 (ai_return == EAI_SYSTEM)
646                                 ? strerror (errno)
647                                 : gai_strerror (ai_return));
648                 return (NULL);
649         }
650
651         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
652         {
653                 sockent_t *se;
654
655                 if ((se = (sockent_t *) malloc (sizeof (sockent_t))) == NULL)
656                 {
657                         ERROR ("malloc: %s", strerror (errno));
658                         continue;
659                 }
660
661                 if ((se->addr = (struct sockaddr_storage *) malloc (sizeof (struct sockaddr_storage))) == NULL)
662                 {
663                         ERROR ("malloc: %s", strerror (errno));
664                         free (se);
665                         continue;
666                 }
667
668                 assert (sizeof (struct sockaddr_storage) >= ai_ptr->ai_addrlen);
669                 memset (se->addr, '\0', sizeof (struct sockaddr_storage));
670                 memcpy (se->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
671                 se->addrlen = ai_ptr->ai_addrlen;
672
673                 se->fd   = socket (ai_ptr->ai_family,
674                                 ai_ptr->ai_socktype,
675                                 ai_ptr->ai_protocol);
676                 se->next = NULL;
677
678                 if (se->fd == -1)
679                 {
680                         ERROR ("socket: %s", strerror (errno));
681                         free (se->addr);
682                         free (se);
683                         continue;
684                 }
685
686                 if (listen != 0)
687                 {
688                         if (network_bind_socket (se, ai_ptr) != 0)
689                         {
690                                 close (se->fd);
691                                 free (se->addr);
692                                 free (se);
693                                 continue;
694                         }
695                 }
696                 else /* listen == 0 */
697                 {
698                         network_set_ttl (se, ai_ptr);
699                 }
700
701                 if (se_tail == NULL)
702                 {
703                         se_head = se;
704                         se_tail = se;
705                 }
706                 else
707                 {
708                         se_tail->next = se;
709                         se_tail = se;
710                 }
711
712                 /* We don't open more than one write-socket per node/service pair.. */
713                 if (listen == 0)
714                         break;
715         }
716
717         freeaddrinfo (ai_list);
718
719         return (se_head);
720 } /* sockent_t *network_create_socket */
721
722 static sockent_t *network_create_default_socket (int listen)
723 {
724         sockent_t *se_ptr  = NULL;
725         sockent_t *se_head = NULL;
726         sockent_t *se_tail = NULL;
727
728         se_ptr = network_create_socket (NET_DEFAULT_V6_ADDR,
729                         NET_DEFAULT_PORT, listen);
730
731         /* Don't send to the same machine in IPv6 and IPv4 if both are available. */
732         if ((listen == 0) && (se_ptr != NULL))
733                 return (se_ptr);
734
735         if (se_ptr != NULL)
736         {
737                 se_head = se_ptr;
738                 se_tail = se_ptr;
739                 while (se_tail->next != NULL)
740                         se_tail = se_tail->next;
741         }
742
743         se_ptr = network_create_socket (NET_DEFAULT_V4_ADDR, NET_DEFAULT_PORT, listen);
744
745         if (se_tail == NULL)
746                 return (se_ptr);
747
748         se_tail->next = se_ptr;
749         return (se_head);
750 } /* sockent_t *network_create_default_socket */
751
752 static int network_add_listen_socket (const char *node, const char *service)
753 {
754         sockent_t *se;
755         sockent_t *se_ptr;
756         int se_num = 0;
757
758         if (service == NULL)
759                 service = NET_DEFAULT_PORT;
760
761         if (node == NULL)
762                 se = network_create_default_socket (1 /* listen == true */);
763         else
764                 se = network_create_socket (node, service, 1 /* listen == true */);
765
766         if (se == NULL)
767                 return (-1);
768
769         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
770                 se_num++;
771
772         listen_sockets = (struct pollfd *) realloc (listen_sockets,
773                         (listen_sockets_num + se_num)
774                         * sizeof (struct pollfd));
775
776         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
777         {
778                 listen_sockets[listen_sockets_num].fd = se_ptr->fd;
779                 listen_sockets[listen_sockets_num].events = POLLIN | POLLPRI;
780                 listen_sockets[listen_sockets_num].revents = 0;
781                 listen_sockets_num++;
782         } /* for (se) */
783
784         free_sockent (se);
785         return (0);
786 } /* int network_add_listen_socket */
787
788 static int network_add_sending_socket (const char *node, const char *service)
789 {
790         sockent_t *se;
791         sockent_t *se_ptr;
792
793         if (service == NULL)
794                 service = NET_DEFAULT_PORT;
795
796         if (node == NULL)
797                 se = network_create_default_socket (0 /* listen == false */);
798         else
799                 se = network_create_socket (node, service, 0 /* listen == false */);
800
801         if (se == NULL)
802                 return (-1);
803
804         if (sending_sockets == NULL)
805         {
806                 sending_sockets = se;
807                 return (0);
808         }
809
810         for (se_ptr = sending_sockets; se_ptr->next != NULL; se_ptr = se_ptr->next)
811                 /* seek end */;
812
813         se_ptr->next = se;
814         return (0);
815 } /* int network_get_listen_socket */
816
817 int network_receive (void)
818 {
819         char buffer[BUFF_SIZE];
820         int  buffer_len;
821
822         int i;
823         int status;
824
825         if (listen_sockets_num == 0)
826                 network_add_listen_socket (NULL, NULL);
827
828         if (listen_sockets_num == 0)
829         {
830                 ERROR ("network: Failed to open a listening socket.");
831                 return (-1);
832         }
833
834         while (listen_loop == 0)
835         {
836                 status = poll (listen_sockets, listen_sockets_num, -1);
837
838                 if (status <= 0)
839                 {
840                         if (errno == EINTR)
841                                 continue;
842                         ERROR ("poll failed: %s",
843                                         strerror (errno));
844                         return (-1);
845                 }
846
847                 for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
848                 {
849                         if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
850                                 continue;
851                         status--;
852
853                         buffer_len = recv (listen_sockets[i].fd,
854                                         buffer, sizeof (buffer),
855                                         0 /* no flags */);
856                         if (buffer_len < 0)
857                         {
858                                 ERROR ("recv failed: %s", strerror (errno));
859                                 return (-1);
860                         }
861
862                         parse_packet (buffer, buffer_len);
863                 } /* for (listen_sockets) */
864         } /* while (listen_loop == 0) */
865
866         return (0);
867 }
868
869 static void *receive_thread (void *arg)
870 {
871         return (network_receive () ? (void *) 1 : (void *) 0);
872 } /* void *receive_thread */
873
874 static void network_send_buffer (const char *buffer, int buffer_len)
875 {
876         sockent_t *se;
877         int status;
878
879         DEBUG ("buffer_len = %i", buffer_len);
880
881         for (se = sending_sockets; se != NULL; se = se->next)
882         {
883                 while (42)
884                 {
885                         status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */,
886                                         (struct sockaddr *) se->addr, se->addrlen);
887                         if (status < 0)
888                         {
889                                 if (errno == EINTR)
890                                         continue;
891                                 ERROR ("network plugin: sendto failed: %s",
892                                                 strerror (errno));
893                                 break;
894                         }
895
896                         break;
897                 } /* while (42) */
898         } /* for (sending_sockets) */
899 } /* void network_send_buffer */
900
901 static int add_to_buffer (char *buffer, int buffer_size,
902                 value_list_t *vl_def, char *type_def,
903                 const data_set_t *ds, const value_list_t *vl)
904 {
905         if (strcmp (vl_def->host, vl->host) != 0)
906         {
907                 if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
908                                         vl->host, strlen (vl->host)) != 0)
909                         return (-1);
910                 strcpy (vl_def->host, vl->host);
911                 DEBUG ("host = %s", vl->host);
912         }
913
914         if (vl_def->time != vl->time)
915         {
916                 if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
917                                         (uint64_t) vl->time))
918                         return (-1);
919                 vl_def->time = vl->time;
920                 DEBUG ("time = %u", (unsigned int) vl->time);
921         }
922
923         if (strcmp (vl_def->plugin, vl->plugin) != 0)
924         {
925                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
926                                         vl->plugin, strlen (vl->plugin)) != 0)
927                         return (-1);
928                 strcpy (vl_def->plugin, vl->plugin);
929                 DEBUG ("plugin = %s", vl->plugin);
930         }
931
932         if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
933         {
934                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
935                                         vl->plugin_instance,
936                                         strlen (vl->plugin_instance)) != 0)
937                         return (-1);
938                 strcpy (vl_def->plugin_instance, vl->plugin_instance);
939                 DEBUG ("plugin_instance = %s", vl->plugin_instance);
940         }
941
942         if (strcmp (type_def, ds->type) != 0)
943         {
944                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
945                                         ds->type, strlen (ds->type)) != 0)
946                         return (-1);
947                 strcpy (type_def, ds->type);
948                 DEBUG ("type = %s", ds->type);
949         }
950
951         if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
952         {
953                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
954                                         vl->type_instance,
955                                         strlen (vl->type_instance)) != 0)
956                         return (-1);
957                 strcpy (vl_def->type_instance, vl->type_instance);
958                 DEBUG ("type_instance = %s", vl->type_instance);
959         }
960         
961         if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
962                 return (-1);
963
964         return (buffer_size);
965 } /* int add_to_buffer */
966
967 static void flush_buffer (void)
968 {
969         network_send_buffer (send_buffer, send_buffer_fill);
970         send_buffer_ptr  = send_buffer;
971         send_buffer_fill = 0;
972         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
973         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
974 }
975
976 static int network_write (const data_set_t *ds, const value_list_t *vl)
977 {
978         int status;
979         /* TODO: lock buffer */
980         status = add_to_buffer (send_buffer_ptr,
981                         sizeof (send_buffer) - send_buffer_fill,
982                         &send_buffer_vl, send_buffer_type,
983                         ds, vl);
984         if (status >= 0)
985         {
986                 send_buffer_fill += status;
987                 send_buffer_ptr  += status;
988         }
989         else
990         {
991                 flush_buffer ();
992
993                 status = add_to_buffer (send_buffer_ptr,
994                                 sizeof (send_buffer) - send_buffer_fill,
995                                 &send_buffer_vl, send_buffer_type,
996                                 ds, vl);
997
998                 if (status >= 0)
999                 {
1000                         send_buffer_fill += status;
1001                         send_buffer_ptr  += status;
1002                 }
1003         }
1004
1005         if (status < 0)
1006         {
1007                 ERROR ("network plugin: Unable to append to the "
1008                                 "buffer for some weird reason");
1009         }
1010         else if ((sizeof (send_buffer) - send_buffer_fill) < 15)
1011         {
1012                 flush_buffer ();
1013         }
1014         /* TODO: unlock buffer */
1015
1016         return ((status < 0) ? -1 : 0);
1017 } /* int network_write */
1018
1019 static int network_config (const char *key, const char *val)
1020 {
1021         char *node;
1022         char *service;
1023
1024         char *fields[3];
1025         int   fields_num;
1026
1027         if ((strcasecmp ("Listen", key) == 0)
1028                         || (strcasecmp ("Server", key) == 0))
1029         {
1030                 char *val_cpy = strdup (val);
1031                 if (val_cpy == NULL)
1032                         return (1);
1033
1034                 service = NET_DEFAULT_PORT;
1035                 fields_num = strsplit (val_cpy, fields, 3);
1036                 if ((fields_num != 1)
1037                                 && (fields_num != 2))
1038                         return (1);
1039                 else if (fields_num == 2)
1040                         service = fields[1];
1041                 node = fields[0];
1042
1043                 if (strcasecmp ("Listen", key) == 0)
1044                         network_add_listen_socket (node, service);
1045                 else
1046                         network_add_sending_socket (node, service);
1047         }
1048         else if (strcasecmp ("TimeToLive", key) == 0)
1049         {
1050                 int tmp = atoi (val);
1051                 if ((tmp > 0) && (tmp < 256))
1052                         network_config_ttl = tmp;
1053                 else
1054                         return (1);
1055         }
1056         else
1057         {
1058                 return (-1);
1059         }
1060         return (0);
1061 }
1062
1063 static int network_shutdown (void)
1064 {
1065         DEBUG ("Shutting down.");
1066
1067         listen_loop++;
1068
1069         pthread_kill (listen_thread, SIGTERM);
1070         pthread_join (listen_thread, NULL /* no return value */);
1071
1072         listen_thread = 0;
1073
1074         return (0);
1075 }
1076
1077 static int network_init (void)
1078 {
1079         plugin_register_shutdown ("network", network_shutdown);
1080
1081         send_buffer_ptr  = send_buffer;
1082         send_buffer_fill = 0;
1083         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1084         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1085
1086         /* setup socket(s) and so on */
1087         if (sending_sockets != NULL)
1088                 plugin_register_write ("network", network_write);
1089
1090         if ((listen_sockets_num != 0) && (listen_thread == 0))
1091         {
1092                 int status;
1093
1094                 status = pthread_create (&listen_thread, NULL /* no attributes */,
1095                                 receive_thread, NULL /* no argument */);
1096
1097                 if (status != 0)
1098                         ERROR ("network: pthread_create failed: %s",
1099                                         strerror (errno));
1100         }
1101         return (0);
1102 } /* int network_init */
1103
1104 void module_register (void)
1105 {
1106         plugin_register_config ("network", network_config,
1107                         config_keys, config_keys_num);
1108         plugin_register_init   ("network", network_init);
1109 }