08c5199566a6a0134f1bf39a0042205d7ecdd588
[collectd.git] / src / daemon / plugin.c
1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35 #include "configfile.h"
36 #include "filter_chain.h"
37 #include "utils_avltree.h"
38 #include "utils_cache.h"
39 #include "utils_complain.h"
40 #include "utils_llist.h"
41 #include "utils_heap.h"
42 #include "utils_time.h"
43 #include "utils_random.h"
44
45 #include <ltdl.h>
46
47 /*
48  * Private structures
49  */
50 struct callback_func_s
51 {
52         void *cf_callback;
53         user_data_t cf_udata;
54         plugin_ctx_t cf_ctx;
55 };
56 typedef struct callback_func_s callback_func_t;
57
58 #define RF_SIMPLE  0
59 #define RF_COMPLEX 1
60 #define RF_REMOVE  65535
61 struct read_func_s
62 {
63         /* `read_func_t' "inherits" from `callback_func_t'.
64          * The `rf_super' member MUST be the first one in this structure! */
65 #define rf_callback rf_super.cf_callback
66 #define rf_udata rf_super.cf_udata
67 #define rf_ctx rf_super.cf_ctx
68         callback_func_t rf_super;
69         char rf_group[DATA_MAX_NAME_LEN];
70         char *rf_name;
71         int rf_type;
72         cdtime_t rf_interval;
73         cdtime_t rf_effective_interval;
74         cdtime_t rf_next_read;
75 };
76 typedef struct read_func_s read_func_t;
77
78 struct write_queue_s;
79 typedef struct write_queue_s write_queue_t;
80 struct write_queue_s
81 {
82         value_list_t *vl;
83         plugin_ctx_t ctx;
84         write_queue_t *next;
85 };
86
87 struct flush_callback_s {
88         char *name;
89         cdtime_t timeout;
90 };
91 typedef struct flush_callback_s flush_callback_t;
92
93 /*
94  * Private variables
95  */
96 static c_avl_tree_t *plugins_loaded = NULL;
97
98 static llist_t *list_init;
99 static llist_t *list_write;
100 static llist_t *list_flush;
101 static llist_t *list_missing;
102 static llist_t *list_shutdown;
103 static llist_t *list_log;
104 static llist_t *list_notification;
105
106 static fc_chain_t *pre_cache_chain = NULL;
107 static fc_chain_t *post_cache_chain = NULL;
108
109 static c_avl_tree_t *data_sets;
110
111 static char *plugindir = NULL;
112
113 #ifndef DEFAULT_MAX_READ_INTERVAL
114 # define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC (86400)
115 #endif
116 static c_heap_t       *read_heap = NULL;
117 static llist_t        *read_list;
118 static int             read_loop = 1;
119 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
120 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
121 static pthread_t      *read_threads = NULL;
122 static int             read_threads_num = 0;
123 static cdtime_t        max_read_interval = DEFAULT_MAX_READ_INTERVAL;
124
125 static write_queue_t  *write_queue_head;
126 static write_queue_t  *write_queue_tail;
127 static long            write_queue_length = 0;
128 static _Bool           write_loop = 1;
129 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
130 static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
131 static pthread_t      *write_threads = NULL;
132 static size_t          write_threads_num = 0;
133
134 static pthread_key_t   plugin_ctx_key;
135 static _Bool           plugin_ctx_key_initialized = 0;
136
137 static long            write_limit_high = 0;
138 static long            write_limit_low = 0;
139
140 static derive_t        stats_values_dropped = 0;
141 static _Bool           record_statistics = 0;
142
143 /*
144  * Static functions
145  */
146 static int plugin_dispatch_values_internal (value_list_t *vl);
147
148 static const char *plugin_get_dir (void)
149 {
150         if (plugindir == NULL)
151                 return (PLUGINDIR);
152         else
153                 return (plugindir);
154 }
155
156 static void plugin_update_internal_statistics (void) { /* {{{ */
157
158         gauge_t copy_write_queue_length = (gauge_t) write_queue_length;
159
160         /* Initialize `vl' */
161         value_list_t vl = VALUE_LIST_INIT;
162         sstrncpy (vl.host, hostname_g, sizeof (vl.host));
163         sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
164
165         /* Write queue */
166         sstrncpy (vl.plugin_instance, "write_queue",
167                         sizeof (vl.plugin_instance));
168
169         /* Write queue : queue length */
170         vl.values = &(value_t) { .gauge = copy_write_queue_length };
171         vl.values_len = 1;
172         sstrncpy (vl.type, "queue_length", sizeof (vl.type));
173         vl.type_instance[0] = 0;
174         plugin_dispatch_values (&vl);
175
176         /* Write queue : Values dropped (queue length > low limit) */
177         vl.values = &(value_t) { .gauge = (gauge_t) stats_values_dropped };
178         vl.values_len = 1;
179         sstrncpy (vl.type, "derive", sizeof (vl.type));
180         sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
181         plugin_dispatch_values (&vl);
182
183         /* Cache */
184         sstrncpy (vl.plugin_instance, "cache",
185                         sizeof (vl.plugin_instance));
186
187         /* Cache : Nb entry in cache tree */
188         vl.values = &(value_t) { .gauge = (gauge_t) uc_get_size() };
189         vl.values_len = 1;
190         sstrncpy (vl.type, "cache_size", sizeof (vl.type));
191         vl.type_instance[0] = 0;
192         plugin_dispatch_values (&vl);
193
194         return;
195 } /* }}} void plugin_update_internal_statistics */
196
197 static void destroy_callback (callback_func_t *cf) /* {{{ */
198 {
199         if (cf == NULL)
200                 return;
201
202         if ((cf->cf_udata.data != NULL) && (cf->cf_udata.free_func != NULL))
203         {
204                 cf->cf_udata.free_func (cf->cf_udata.data);
205                 cf->cf_udata.data = NULL;
206                 cf->cf_udata.free_func = NULL;
207         }
208         sfree (cf);
209 } /* }}} void destroy_callback */
210
211 static void destroy_all_callbacks (llist_t **list) /* {{{ */
212 {
213         llentry_t *le;
214
215         if (*list == NULL)
216                 return;
217
218         le = llist_head (*list);
219         while (le != NULL)
220         {
221                 llentry_t *le_next;
222
223                 le_next = le->next;
224
225                 sfree (le->key);
226                 destroy_callback (le->value);
227                 le->value = NULL;
228
229                 le = le_next;
230         }
231
232         llist_destroy (*list);
233         *list = NULL;
234 } /* }}} void destroy_all_callbacks */
235
236 static void destroy_read_heap (void) /* {{{ */
237 {
238         if (read_heap == NULL)
239                 return;
240
241         while (42)
242         {
243                 read_func_t *rf;
244
245                 rf = c_heap_get_root (read_heap);
246                 if (rf == NULL)
247                         break;
248                 sfree (rf->rf_name);
249                 destroy_callback ((callback_func_t *) rf);
250         }
251
252         c_heap_destroy (read_heap);
253         read_heap = NULL;
254 } /* }}} void destroy_read_heap */
255
256 static int register_callback (llist_t **list, /* {{{ */
257                 const char *name, callback_func_t *cf)
258 {
259         llentry_t *le;
260         char *key;
261
262         if (*list == NULL)
263         {
264                 *list = llist_create ();
265                 if (*list == NULL)
266                 {
267                         ERROR ("plugin: register_callback: "
268                                         "llist_create failed.");
269                         destroy_callback (cf);
270                         return (-1);
271                 }
272         }
273
274         key = strdup (name);
275         if (key == NULL)
276         {
277                 ERROR ("plugin: register_callback: strdup failed.");
278                 destroy_callback (cf);
279                 return (-1);
280         }
281
282         le = llist_search (*list, name);
283         if (le == NULL)
284         {
285                 le = llentry_create (key, cf);
286                 if (le == NULL)
287                 {
288                         ERROR ("plugin: register_callback: "
289                                         "llentry_create failed.");
290                         sfree (key);
291                         destroy_callback (cf);
292                         return (-1);
293                 }
294
295                 llist_append (*list, le);
296         }
297         else
298         {
299                 callback_func_t *old_cf;
300
301                 old_cf = le->value;
302                 le->value = cf;
303
304                 WARNING ("plugin: register_callback: "
305                                 "a callback named `%s' already exists - "
306                                 "overwriting the old entry!", name);
307
308                 destroy_callback (old_cf);
309                 sfree (key);
310         }
311
312         return (0);
313 } /* }}} int register_callback */
314
315 static void log_list_callbacks (llist_t **list, /* {{{ */
316                                 const char *comment)
317 {
318         char *str;
319         int len;
320         int i;
321         llentry_t *le;
322         int n;
323         char **keys;
324
325         n = llist_size(*list);
326         if (n == 0)
327         {
328                 INFO("%s [none]", comment);
329                 return;
330         }
331
332         keys = calloc(n, sizeof(char*));
333
334         if (keys == NULL)
335         {
336                 ERROR("%s: failed to allocate memory for list of callbacks",
337                       comment);
338
339                 return;
340         }
341
342         for (le = llist_head (*list), i = 0, len = 0;
343              le != NULL;
344              le = le->next, i++)
345         {
346                 keys[i] = le->key;
347                 len += strlen(le->key) + 6;
348         }
349         str = malloc(len + 10);
350         if (str == NULL)
351         {
352                 ERROR("%s: failed to allocate memory for list of callbacks",
353                       comment);
354         }
355         else
356         {
357                 *str = '\0';
358                 strjoin(str, len, keys, n, "', '");
359                 INFO("%s ['%s']", comment, str);
360                 sfree (str);
361         }
362         sfree (keys);
363 } /* }}} void log_list_callbacks */
364
365 static int create_register_callback (llist_t **list, /* {{{ */
366                 const char *name, void *callback, user_data_t const *ud)
367 {
368         callback_func_t *cf;
369
370         cf = calloc (1, sizeof (*cf));
371         if (cf == NULL)
372         {
373                 ERROR ("plugin: create_register_callback: calloc failed.");
374                 return (-1);
375         }
376
377         cf->cf_callback = callback;
378         if (ud == NULL)
379         {
380                 cf->cf_udata.data = NULL;
381                 cf->cf_udata.free_func = NULL;
382         }
383         else
384         {
385                 cf->cf_udata = *ud;
386         }
387
388         cf->cf_ctx = plugin_get_ctx ();
389
390         return (register_callback (list, name, cf));
391 } /* }}} int create_register_callback */
392
393 static int plugin_unregister (llist_t *list, const char *name) /* {{{ */
394 {
395         llentry_t *e;
396
397         if (list == NULL)
398                 return (-1);
399
400         e = llist_search (list, name);
401         if (e == NULL)
402                 return (-1);
403
404         llist_remove (list, e);
405
406         sfree (e->key);
407         destroy_callback (e->value);
408
409         llentry_destroy (e);
410
411         return (0);
412 } /* }}} int plugin_unregister */
413
414 /*
415  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
416  * object, but it will bitch about a shared object not having a
417  * ``module_register'' symbol..
418  */
419 static int plugin_load_file (char *file, uint32_t flags)
420 {
421         lt_dlhandle dlh;
422         void (*reg_handle) (void);
423
424         lt_dlinit ();
425         lt_dlerror (); /* clear errors */
426
427 #if LIBTOOL_VERSION == 2
428         if (flags & PLUGIN_FLAGS_GLOBAL) {
429                 lt_dladvise advise;
430                 lt_dladvise_init(&advise);
431                 lt_dladvise_global(&advise);
432                 dlh = lt_dlopenadvise(file, advise);
433                 lt_dladvise_destroy(&advise);
434         } else {
435                 dlh = lt_dlopen (file);
436         }
437 #else /* if LIBTOOL_VERSION == 1 */
438         if (flags & PLUGIN_FLAGS_GLOBAL)
439                 WARNING ("plugin_load_file: The global flag is not supported, "
440                                 "libtool 2 is required for this.");
441         dlh = lt_dlopen (file);
442 #endif
443
444         if (dlh == NULL)
445         {
446                 char errbuf[1024] = "";
447
448                 ssnprintf (errbuf, sizeof (errbuf),
449                                 "lt_dlopen (\"%s\") failed: %s. "
450                                 "The most common cause for this problem is "
451                                 "missing dependencies. Use ldd(1) to check "
452                                 "the dependencies of the plugin "
453                                 "/ shared object.",
454                                 file, lt_dlerror ());
455
456                 ERROR ("%s", errbuf);
457                 /* Make sure this is printed to STDERR in any case, but also
458                  * make sure it's printed only once. */
459                 if (list_log != NULL)
460                         fprintf (stderr, "ERROR: %s\n", errbuf);
461
462                 return (1);
463         }
464
465         if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL)
466         {
467                 WARNING ("Couldn't find symbol \"module_register\" in \"%s\": %s\n",
468                                 file, lt_dlerror ());
469                 lt_dlclose (dlh);
470                 return (-1);
471         }
472
473         (*reg_handle) ();
474
475         return (0);
476 }
477
478 static void *plugin_read_thread (void __attribute__((unused)) *args)
479 {
480         while (read_loop != 0)
481         {
482                 read_func_t *rf;
483                 plugin_ctx_t old_ctx;
484                 cdtime_t start;
485                 cdtime_t now;
486                 cdtime_t elapsed;
487                 int status;
488                 int rf_type;
489                 int rc;
490
491                 /* Get the read function that needs to be read next.
492                  * We don't need to hold "read_lock" for the heap, but we need
493                  * to call c_heap_get_root() and pthread_cond_wait() in the
494                  * same protected block. */
495                 pthread_mutex_lock (&read_lock);
496                 rf = c_heap_get_root (read_heap);
497                 if (rf == NULL)
498                 {
499                         pthread_cond_wait (&read_cond, &read_lock);
500                         pthread_mutex_unlock (&read_lock);
501                         continue;
502                 }
503                 pthread_mutex_unlock (&read_lock);
504
505                 if (rf->rf_interval == 0)
506                 {
507                         /* this should not happen, because the interval is set
508                          * for each plugin when loading it
509                          * XXX: issue a warning? */
510                         rf->rf_interval = plugin_get_interval ();
511                         rf->rf_effective_interval = rf->rf_interval;
512
513                         rf->rf_next_read = cdtime ();
514                 }
515
516                 /* sleep until this entry is due,
517                  * using pthread_cond_timedwait */
518                 pthread_mutex_lock (&read_lock);
519                 /* In pthread_cond_timedwait, spurious wakeups are possible
520                  * (and really happen, at least on NetBSD with > 1 CPU), thus
521                  * we need to re-evaluate the condition every time
522                  * pthread_cond_timedwait returns. */
523                 rc = 0;
524                 while ((read_loop != 0)
525                                 && (cdtime () < rf->rf_next_read)
526                                 && rc == 0)
527                 {
528                         rc = pthread_cond_timedwait (&read_cond, &read_lock,
529                                 &CDTIME_T_TO_TIMESPEC (rf->rf_next_read));
530                 }
531
532                 /* Must hold `read_lock' when accessing `rf->rf_type'. */
533                 rf_type = rf->rf_type;
534                 pthread_mutex_unlock (&read_lock);
535
536                 /* Check if we're supposed to stop.. This may have interrupted
537                  * the sleep, too. */
538                 if (read_loop == 0)
539                 {
540                         /* Insert `rf' again, so it can be free'd correctly */
541                         c_heap_insert (read_heap, rf);
542                         break;
543                 }
544
545                 /* The entry has been marked for deletion. The linked list
546                  * entry has already been removed by `plugin_unregister_read'.
547                  * All we have to do here is free the `read_func_t' and
548                  * continue. */
549                 if (rf_type == RF_REMOVE)
550                 {
551                         DEBUG ("plugin_read_thread: Destroying the `%s' "
552                                         "callback.", rf->rf_name);
553                         sfree (rf->rf_name);
554                         destroy_callback ((callback_func_t *) rf);
555                         rf = NULL;
556                         continue;
557                 }
558
559                 DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
560
561                 start = cdtime ();
562
563                 old_ctx = plugin_set_ctx (rf->rf_ctx);
564
565                 if (rf_type == RF_SIMPLE)
566                 {
567                         int (*callback) (void);
568
569                         callback = rf->rf_callback;
570                         status = (*callback) ();
571                 }
572                 else
573                 {
574                         plugin_read_cb callback;
575
576                         assert (rf_type == RF_COMPLEX);
577
578                         callback = rf->rf_callback;
579                         status = (*callback) (&rf->rf_udata);
580                 }
581
582                 plugin_set_ctx (old_ctx);
583
584                 /* If the function signals failure, we will increase the
585                  * intervals in which it will be called. */
586                 if (status != 0)
587                 {
588                         rf->rf_effective_interval *= 2;
589                         if (rf->rf_effective_interval > max_read_interval)
590                                 rf->rf_effective_interval = max_read_interval;
591
592                         NOTICE ("read-function of plugin `%s' failed. "
593                                         "Will suspend it for %.3f seconds.",
594                                         rf->rf_name,
595                                         CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
596                 }
597                 else
598                 {
599                         /* Success: Restore the interval, if it was changed. */
600                         rf->rf_effective_interval = rf->rf_interval;
601                 }
602
603                 /* update the ``next read due'' field */
604                 now = cdtime ();
605
606                 /* calculate the time spent in the read function */
607                 elapsed = (now - start);
608
609                 if (elapsed > rf->rf_effective_interval)
610                         WARNING ("plugin_read_thread: read-function of the `%s' plugin took %.3f "
611                                 "seconds, which is above its read interval (%.3f seconds). You might "
612                                 "want to adjust the `Interval' or `ReadThreads' settings.",
613                                 rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
614                                 CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
615
616                 DEBUG ("plugin_read_thread: read-function of the `%s' plugin took "
617                                 "%.6f seconds.",
618                                 rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
619
620                 DEBUG ("plugin_read_thread: Effective interval of the "
621                                 "`%s' plugin is %.3f seconds.",
622                                 rf->rf_name,
623                                 CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
624
625                 /* Calculate the next (absolute) time at which this function
626                  * should be called. */
627                 rf->rf_next_read += rf->rf_effective_interval;
628
629                 /* Check, if `rf_next_read' is in the past. */
630                 if (rf->rf_next_read < now)
631                 {
632                         /* `rf_next_read' is in the past. Insert `now'
633                          * so this value doesn't trail off into the
634                          * past too much. */
635                         rf->rf_next_read = now;
636                 }
637
638                 DEBUG ("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
639                                 rf->rf_name,
640                                 CDTIME_T_TO_DOUBLE (rf->rf_next_read));
641
642                 /* Re-insert this read function into the heap again. */
643                 c_heap_insert (read_heap, rf);
644         } /* while (read_loop) */
645
646         pthread_exit (NULL);
647         return ((void *) 0);
648 } /* void *plugin_read_thread */
649
650 static void start_read_threads (int num)
651 {
652         if (read_threads != NULL)
653                 return;
654
655         read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
656         if (read_threads == NULL)
657         {
658                 ERROR ("plugin: start_read_threads: calloc failed.");
659                 return;
660         }
661
662         read_threads_num = 0;
663         for (int i = 0; i < num; i++)
664         {
665                 if (pthread_create (read_threads + read_threads_num, NULL,
666                                         plugin_read_thread, NULL) == 0)
667                 {
668 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
669                         char thread_name[16];
670                         sstrncpy (thread_name, "plugin reader", sizeof(thread_name));
671 # if defined(HAVE_PTHREAD_SETNAME_NP)
672                         pthread_setname_np (*(read_threads + read_threads_num),
673                                 thread_name);
674 # elif defined(HAVE_PTHREAD_SET_NAME_NP)
675                         pthread_set_name_np (*(read_threads + read_threads_num),
676                                 thread_name);
677 # endif
678 #endif
679                         read_threads_num++;
680                 }
681                 else
682                 {
683                         ERROR ("plugin: start_read_threads: pthread_create failed.");
684                         return;
685                 }
686         } /* for (i) */
687 } /* void start_read_threads */
688
689 static void stop_read_threads (void)
690 {
691         if (read_threads == NULL)
692                 return;
693
694         INFO ("collectd: Stopping %i read threads.", read_threads_num);
695
696         pthread_mutex_lock (&read_lock);
697         read_loop = 0;
698         DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
699         pthread_cond_broadcast (&read_cond);
700         pthread_mutex_unlock (&read_lock);
701
702         for (int i = 0; i < read_threads_num; i++)
703         {
704                 if (pthread_join (read_threads[i], NULL) != 0)
705                 {
706                         ERROR ("plugin: stop_read_threads: pthread_join failed.");
707                 }
708                 read_threads[i] = (pthread_t) 0;
709         }
710         sfree (read_threads);
711         read_threads_num = 0;
712 } /* void stop_read_threads */
713
714 static void plugin_value_list_free (value_list_t *vl) /* {{{ */
715 {
716         if (vl == NULL)
717                 return;
718
719         meta_data_destroy (vl->meta);
720         sfree (vl->values);
721         sfree (vl);
722 } /* }}} void plugin_value_list_free */
723
724 static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
725 {
726         value_list_t *vl;
727
728         if (vl_orig == NULL)
729                 return (NULL);
730
731         vl = malloc (sizeof (*vl));
732         if (vl == NULL)
733                 return (NULL);
734         memcpy (vl, vl_orig, sizeof (*vl));
735
736         if (vl->host[0] == 0)
737                 sstrncpy (vl->host, hostname_g, sizeof (vl->host));
738
739         vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
740         if (vl->values == NULL)
741         {
742                 plugin_value_list_free (vl);
743                 return (NULL);
744         }
745         memcpy (vl->values, vl_orig->values,
746                         vl_orig->values_len * sizeof (*vl->values));
747
748         vl->meta = meta_data_clone (vl->meta);
749         if ((vl_orig->meta != NULL) && (vl->meta == NULL))
750         {
751                 plugin_value_list_free (vl);
752                 return (NULL);
753         }
754
755         if (vl->time == 0)
756                 vl->time = cdtime ();
757
758         /* Fill in the interval from the thread context, if it is zero. */
759         if (vl->interval == 0)
760         {
761                 plugin_ctx_t ctx = plugin_get_ctx ();
762
763                 if (ctx.interval != 0)
764                         vl->interval = ctx.interval;
765                 else
766                 {
767                         char name[6 * DATA_MAX_NAME_LEN];
768                         FORMAT_VL (name, sizeof (name), vl);
769                         ERROR ("plugin_value_list_clone: Unable to determine "
770                                         "interval from context for "
771                                         "value list \"%s\". "
772                                         "This indicates a broken plugin. "
773                                         "Please report this problem to the "
774                                         "collectd mailing list or at "
775                                         "<http://collectd.org/bugs/>.", name);
776                         vl->interval = cf_get_default_interval ();
777                 }
778         }
779
780         return (vl);
781 } /* }}} value_list_t *plugin_value_list_clone */
782
783 static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
784 {
785         write_queue_t *q;
786
787         q = malloc (sizeof (*q));
788         if (q == NULL)
789                 return (ENOMEM);
790         q->next = NULL;
791
792         q->vl = plugin_value_list_clone (vl);
793         if (q->vl == NULL)
794         {
795                 sfree (q);
796                 return (ENOMEM);
797         }
798
799         /* Store context of caller (read plugin); otherwise, it would not be
800          * available to the write plugins when actually dispatching the
801          * value-list later on. */
802         q->ctx = plugin_get_ctx ();
803
804         pthread_mutex_lock (&write_lock);
805
806         if (write_queue_tail == NULL)
807         {
808                 write_queue_head = q;
809                 write_queue_tail = q;
810                 write_queue_length = 1;
811         }
812         else
813         {
814                 write_queue_tail->next = q;
815                 write_queue_tail = q;
816                 write_queue_length += 1;
817         }
818
819         pthread_cond_signal (&write_cond);
820         pthread_mutex_unlock (&write_lock);
821
822         return (0);
823 } /* }}} int plugin_write_enqueue */
824
825 static value_list_t *plugin_write_dequeue (void) /* {{{ */
826 {
827         write_queue_t *q;
828         value_list_t *vl;
829
830         pthread_mutex_lock (&write_lock);
831
832         while (write_loop && (write_queue_head == NULL))
833                 pthread_cond_wait (&write_cond, &write_lock);
834
835         if (write_queue_head == NULL)
836         {
837                 pthread_mutex_unlock (&write_lock);
838                 return (NULL);
839         }
840
841         q = write_queue_head;
842         write_queue_head = q->next;
843         write_queue_length -= 1;
844         if (write_queue_head == NULL) {
845                 write_queue_tail = NULL;
846                 assert(0 == write_queue_length);
847                 }
848
849         pthread_mutex_unlock (&write_lock);
850
851         (void) plugin_set_ctx (q->ctx);
852
853         vl = q->vl;
854         sfree (q);
855         return (vl);
856 } /* }}} value_list_t *plugin_write_dequeue */
857
858 static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
859 {
860         while (write_loop)
861         {
862                 value_list_t *vl = plugin_write_dequeue ();
863                 if (vl == NULL)
864                         continue;
865
866                 plugin_dispatch_values_internal (vl);
867
868                 plugin_value_list_free (vl);
869         }
870
871         pthread_exit (NULL);
872         return ((void *) 0);
873 } /* }}} void *plugin_write_thread */
874
875 static void start_write_threads (size_t num) /* {{{ */
876 {
877         if (write_threads != NULL)
878                 return;
879
880         write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
881         if (write_threads == NULL)
882         {
883                 ERROR ("plugin: start_write_threads: calloc failed.");
884                 return;
885         }
886
887         write_threads_num = 0;
888         for (size_t i = 0; i < num; i++)
889         {
890                 int status;
891
892                 status = pthread_create (write_threads + write_threads_num,
893                                 /* attr = */ NULL,
894                                 plugin_write_thread,
895                                 /* arg = */ NULL);
896                 if (status != 0)
897                 {
898                         char errbuf[1024];
899                         ERROR ("plugin: start_write_threads: pthread_create failed "
900                                         "with status %i (%s).", status,
901                                         sstrerror (status, errbuf, sizeof (errbuf)));
902                         return;
903                 } else {
904 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
905                         char thread_name[16];
906                         sstrncpy (thread_name, "plugin writer", sizeof(thread_name));
907 # if defined(HAVE_PTHREAD_SETNAME_NP)
908                         pthread_setname_np (*(write_threads + write_threads_num),
909                                 thread_name);
910 # elif defined(HAVE_PTHREAD_SET_NAME_NP)
911                         pthread_set_name_np (*(write_threads + write_threads_num),
912                                 thread_name);
913 # endif
914 #endif
915                         write_threads_num++;
916                 }
917         } /* for (i) */
918 } /* }}} void start_write_threads */
919
920 static void stop_write_threads (void) /* {{{ */
921 {
922         write_queue_t *q;
923         size_t i;
924
925         if (write_threads == NULL)
926                 return;
927
928         INFO ("collectd: Stopping %zu write threads.", write_threads_num);
929
930         pthread_mutex_lock (&write_lock);
931         write_loop = 0;
932         DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
933         pthread_cond_broadcast (&write_cond);
934         pthread_mutex_unlock (&write_lock);
935
936         for (i = 0; i < write_threads_num; i++)
937         {
938                 if (pthread_join (write_threads[i], NULL) != 0)
939                 {
940                         ERROR ("plugin: stop_write_threads: pthread_join failed.");
941                 }
942                 write_threads[i] = (pthread_t) 0;
943         }
944         sfree (write_threads);
945         write_threads_num = 0;
946
947         pthread_mutex_lock (&write_lock);
948         i = 0;
949         for (q = write_queue_head; q != NULL; )
950         {
951                 write_queue_t *q1 = q;
952                 plugin_value_list_free (q->vl);
953                 q = q->next;
954                 sfree (q1);
955                 i++;
956         }
957         write_queue_head = NULL;
958         write_queue_tail = NULL;
959         write_queue_length = 0;
960         pthread_mutex_unlock (&write_lock);
961
962         if (i > 0)
963         {
964                 WARNING ("plugin: %zu value list%s left after shutting down "
965                                 "the write threads.",
966                                 i, (i == 1) ? " was" : "s were");
967         }
968 } /* }}} void stop_write_threads */
969
970 /*
971  * Public functions
972  */
973 void plugin_set_dir (const char *dir)
974 {
975         sfree (plugindir);
976
977         if (dir == NULL)
978         {
979                 plugindir = NULL;
980                 return;
981         }
982
983         plugindir = strdup (dir);
984         if (plugindir == NULL)
985                 ERROR ("plugin_set_dir: strdup(\"%s\") failed", dir);
986 }
987
988 static _Bool plugin_is_loaded (char const *name)
989 {
990         int status;
991
992         if (plugins_loaded == NULL)
993                 plugins_loaded = c_avl_create ((int (*) (const void *, const void *)) strcasecmp);
994         assert (plugins_loaded != NULL);
995
996         status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
997         return (status == 0);
998 }
999
1000 static int plugin_mark_loaded (char const *name)
1001 {
1002         char *name_copy;
1003         int status;
1004
1005         name_copy = strdup (name);
1006         if (name_copy == NULL)
1007                 return (ENOMEM);
1008
1009         status = c_avl_insert (plugins_loaded,
1010                         /* key = */ name_copy, /* value = */ NULL);
1011         return (status);
1012 }
1013
1014 static void plugin_free_loaded (void)
1015 {
1016         void *key;
1017         void *value;
1018
1019         if (plugins_loaded == NULL)
1020                 return;
1021
1022         while (c_avl_pick (plugins_loaded, &key, &value) == 0)
1023         {
1024                 sfree (key);
1025                 assert (value == NULL);
1026         }
1027
1028         c_avl_destroy (plugins_loaded);
1029         plugins_loaded = NULL;
1030 }
1031
1032 #define BUFSIZE 512
1033 int plugin_load (char const *plugin_name, uint32_t flags)
1034 {
1035         DIR  *dh;
1036         const char *dir;
1037         char  filename[BUFSIZE] = "";
1038         char  typename[BUFSIZE];
1039         int   ret;
1040         struct stat    statbuf;
1041         struct dirent *de;
1042         int status;
1043
1044         if (plugin_name == NULL)
1045                 return (EINVAL);
1046
1047         /* Check if plugin is already loaded and don't do anything in this
1048          * case. */
1049         if (plugin_is_loaded (plugin_name))
1050                 return (0);
1051
1052         dir = plugin_get_dir ();
1053         ret = 1;
1054
1055         /*
1056          * XXX: Magic at work:
1057          *
1058          * Some of the language bindings, for example the Python and Perl
1059          * plugins, need to be able to export symbols to the scripts they run.
1060          * For this to happen, the "Globals" flag needs to be set.
1061          * Unfortunately, this technical detail is hard to explain to the
1062          * average user and she shouldn't have to worry about this, ideally.
1063          * So in order to save everyone's sanity use a different default for a
1064          * handful of special plugins. --octo
1065          */
1066         if ((strcasecmp ("perl", plugin_name) == 0)
1067                         || (strcasecmp ("python", plugin_name) == 0))
1068                 flags |= PLUGIN_FLAGS_GLOBAL;
1069
1070         /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
1071          * type when matching the filename */
1072         status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
1073         if ((status < 0) || ((size_t) status >= sizeof (typename)))
1074         {
1075                 WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
1076                 return (-1);
1077         }
1078
1079         if ((dh = opendir (dir)) == NULL)
1080         {
1081                 char errbuf[1024];
1082                 ERROR ("plugin_load: opendir (%s) failed: %s", dir,
1083                                 sstrerror (errno, errbuf, sizeof (errbuf)));
1084                 return (-1);
1085         }
1086
1087         while ((de = readdir (dh)) != NULL)
1088         {
1089                 if (strcasecmp (de->d_name, typename))
1090                         continue;
1091
1092                 status = ssnprintf (filename, sizeof (filename),
1093                                 "%s/%s", dir, de->d_name);
1094                 if ((status < 0) || ((size_t) status >= sizeof (filename)))
1095                 {
1096                         WARNING ("plugin_load: Filename too long: \"%s/%s\"",
1097                                         dir, de->d_name);
1098                         continue;
1099                 }
1100
1101                 if (lstat (filename, &statbuf) == -1)
1102                 {
1103                         char errbuf[1024];
1104                         WARNING ("plugin_load: stat (\"%s\") failed: %s",
1105                                         filename,
1106                                         sstrerror (errno, errbuf, sizeof (errbuf)));
1107                         continue;
1108                 }
1109                 else if (!S_ISREG (statbuf.st_mode))
1110                 {
1111                         /* don't follow symlinks */
1112                         WARNING ("plugin_load: %s is not a regular file.",
1113                                         filename);
1114                         continue;
1115                 }
1116
1117                 status = plugin_load_file (filename, flags);
1118                 if (status == 0)
1119                 {
1120                         /* success */
1121                         plugin_mark_loaded (plugin_name);
1122                         ret = 0;
1123                         INFO ("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1124                         break;
1125                 }
1126                 else
1127                 {
1128                         ERROR ("plugin_load: Load plugin \"%s\" failed with "
1129                                         "status %i.", plugin_name, status);
1130                 }
1131         }
1132
1133         closedir (dh);
1134
1135         if (filename[0] == 0)
1136                 ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
1137                                 plugin_name, dir);
1138
1139         return (ret);
1140 }
1141
1142 /*
1143  * The `register_*' functions follow
1144  */
1145 int plugin_register_config (const char *name,
1146                 int (*callback) (const char *key, const char *val),
1147                 const char **keys, int keys_num)
1148 {
1149         cf_register (name, callback, keys, keys_num);
1150         return (0);
1151 } /* int plugin_register_config */
1152
1153 int plugin_register_complex_config (const char *type,
1154                 int (*callback) (oconfig_item_t *))
1155 {
1156         return (cf_register_complex (type, callback));
1157 } /* int plugin_register_complex_config */
1158
1159 int plugin_register_init (const char *name,
1160                 int (*callback) (void))
1161 {
1162         return (create_register_callback (&list_init, name, (void *) callback,
1163                                 /* user_data = */ NULL));
1164 } /* plugin_register_init */
1165
1166 static int plugin_compare_read_func (const void *arg0, const void *arg1)
1167 {
1168         const read_func_t *rf0;
1169         const read_func_t *rf1;
1170
1171         rf0 = arg0;
1172         rf1 = arg1;
1173
1174         if (rf0->rf_next_read < rf1->rf_next_read)
1175                 return (-1);
1176         else if (rf0->rf_next_read > rf1->rf_next_read)
1177                 return (1);
1178         else
1179                 return (0);
1180 } /* int plugin_compare_read_func */
1181
1182 /* Add a read function to both, the heap and a linked list. The linked list if
1183  * used to look-up read functions, especially for the remove function. The heap
1184  * is used to determine which plugin to read next. */
1185 static int plugin_insert_read (read_func_t *rf)
1186 {
1187         int status;
1188         llentry_t *le;
1189
1190         rf->rf_next_read = cdtime ();
1191         rf->rf_effective_interval = rf->rf_interval;
1192
1193         pthread_mutex_lock (&read_lock);
1194
1195         if (read_list == NULL)
1196         {
1197                 read_list = llist_create ();
1198                 if (read_list == NULL)
1199                 {
1200                         pthread_mutex_unlock (&read_lock);
1201                         ERROR ("plugin_insert_read: read_list failed.");
1202                         return (-1);
1203                 }
1204         }
1205
1206         if (read_heap == NULL)
1207         {
1208                 read_heap = c_heap_create (plugin_compare_read_func);
1209                 if (read_heap == NULL)
1210                 {
1211                         pthread_mutex_unlock (&read_lock);
1212                         ERROR ("plugin_insert_read: c_heap_create failed.");
1213                         return (-1);
1214                 }
1215         }
1216
1217         le = llist_search (read_list, rf->rf_name);
1218         if (le != NULL)
1219         {
1220                 pthread_mutex_unlock (&read_lock);
1221                 WARNING ("The read function \"%s\" is already registered. "
1222                                 "Check for duplicate \"LoadPlugin\" lines "
1223                                 "in your configuration!",
1224                                 rf->rf_name);
1225                 return (EINVAL);
1226         }
1227
1228         le = llentry_create (rf->rf_name, rf);
1229         if (le == NULL)
1230         {
1231                 pthread_mutex_unlock (&read_lock);
1232                 ERROR ("plugin_insert_read: llentry_create failed.");
1233                 return (-1);
1234         }
1235
1236         status = c_heap_insert (read_heap, rf);
1237         if (status != 0)
1238         {
1239                 pthread_mutex_unlock (&read_lock);
1240                 ERROR ("plugin_insert_read: c_heap_insert failed.");
1241                 llentry_destroy (le);
1242                 return (-1);
1243         }
1244
1245         /* This does not fail. */
1246         llist_append (read_list, le);
1247
1248         /* Wake up all the read threads. */
1249         pthread_cond_broadcast (&read_cond);
1250         pthread_mutex_unlock (&read_lock);
1251         return (0);
1252 } /* int plugin_insert_read */
1253
1254 int plugin_register_read (const char *name,
1255                 int (*callback) (void))
1256 {
1257         read_func_t *rf;
1258         int status;
1259
1260         rf = calloc (1, sizeof (*rf));
1261         if (rf == NULL)
1262         {
1263                 ERROR ("plugin_register_read: calloc failed.");
1264                 return (ENOMEM);
1265         }
1266
1267         rf->rf_callback = (void *) callback;
1268         rf->rf_udata.data = NULL;
1269         rf->rf_udata.free_func = NULL;
1270         rf->rf_ctx = plugin_get_ctx ();
1271         rf->rf_group[0] = '\0';
1272         rf->rf_name = strdup (name);
1273         rf->rf_type = RF_SIMPLE;
1274         rf->rf_interval = plugin_get_interval ();
1275
1276         status = plugin_insert_read (rf);
1277         if (status != 0) {
1278                 sfree (rf->rf_name);
1279                 sfree (rf);
1280         }
1281
1282         return (status);
1283 } /* int plugin_register_read */
1284
1285 int plugin_register_complex_read (const char *group, const char *name,
1286                 plugin_read_cb callback,
1287                 cdtime_t interval,
1288                 user_data_t const *user_data)
1289 {
1290         read_func_t *rf;
1291         int status;
1292
1293         rf = calloc (1,sizeof (*rf));
1294         if (rf == NULL)
1295         {
1296                 ERROR ("plugin_register_complex_read: calloc failed.");
1297                 return (ENOMEM);
1298         }
1299
1300         rf->rf_callback = (void *) callback;
1301         if (group != NULL)
1302                 sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
1303         else
1304                 rf->rf_group[0] = '\0';
1305         rf->rf_name = strdup (name);
1306         rf->rf_type = RF_COMPLEX;
1307         rf->rf_interval = (interval != 0) ? interval : plugin_get_interval ();
1308
1309         /* Set user data */
1310         if (user_data == NULL)
1311         {
1312                 rf->rf_udata.data = NULL;
1313                 rf->rf_udata.free_func = NULL;
1314         }
1315         else
1316         {
1317                 rf->rf_udata = *user_data;
1318         }
1319
1320         rf->rf_ctx = plugin_get_ctx ();
1321
1322         status = plugin_insert_read (rf);
1323         if (status != 0) {
1324                 sfree (rf->rf_name);
1325                 sfree (rf);
1326         }
1327
1328         return (status);
1329 } /* int plugin_register_complex_read */
1330
1331 int plugin_register_write (const char *name,
1332                 plugin_write_cb callback, user_data_t const *ud)
1333 {
1334         return (create_register_callback (&list_write, name,
1335                                 (void *) callback, ud));
1336 } /* int plugin_register_write */
1337
1338 static int plugin_flush_timeout_callback (user_data_t *ud)
1339 {
1340         flush_callback_t *cb = ud->data;
1341
1342         return plugin_flush (cb->name, cb->timeout, /* identifier = */ NULL);
1343 } /* static int plugin_flush_callback */
1344
1345 static void plugin_flush_timeout_callback_free (void *data)
1346 {
1347         flush_callback_t *cb = data;
1348
1349         if (cb == NULL) return;
1350
1351         sfree (cb->name);
1352         sfree (cb);
1353 } /* static void plugin_flush_callback_free */
1354
1355 static char *plugin_flush_callback_name (const char *name)
1356 {
1357         const char *flush_prefix = "flush/";
1358         size_t prefix_size;
1359         char *flush_name;
1360         size_t name_size;
1361
1362         prefix_size = strlen(flush_prefix);
1363         name_size = strlen(name);
1364
1365         flush_name = malloc (name_size + prefix_size + 1);
1366         if (flush_name == NULL)
1367         {
1368                 ERROR ("plugin_flush_callback_name: malloc failed.");
1369                 return (NULL);
1370         }
1371
1372         sstrncpy (flush_name, flush_prefix, prefix_size + 1);
1373         sstrncpy (flush_name + prefix_size, name, name_size + 1);
1374
1375         return flush_name;
1376 } /* static char *plugin_flush_callback_name */
1377
1378 int plugin_register_flush (const char *name,
1379                 plugin_flush_cb callback, user_data_t const *ud)
1380 {
1381         int status;
1382         plugin_ctx_t ctx = plugin_get_ctx ();
1383
1384         status = create_register_callback (&list_flush, name,
1385                 (void *) callback, ud);
1386         if (status != 0)
1387                 return status;
1388
1389         if (ctx.flush_interval != 0)
1390         {
1391                 char *flush_name;
1392                 flush_callback_t *cb;
1393
1394                 flush_name = plugin_flush_callback_name (name);
1395                 if (flush_name == NULL)
1396                         return (-1);
1397
1398                 cb = malloc(sizeof (*cb));
1399                 if (cb == NULL)
1400                 {
1401                         ERROR ("plugin_register_flush: malloc failed.");
1402                         sfree (flush_name);
1403                         return (-1);
1404                 }
1405
1406                 cb->name = strdup (name);
1407                 if (cb->name == NULL)
1408                 {
1409                         ERROR ("plugin_register_flush: strdup failed.");
1410                         sfree (cb);
1411                         sfree (flush_name);
1412                         return (-1);
1413                 }
1414                 cb->timeout = ctx.flush_timeout;
1415
1416                 status = plugin_register_complex_read (
1417                         /* group     = */ "flush",
1418                         /* name      = */ flush_name,
1419                         /* callback  = */ plugin_flush_timeout_callback,
1420                         /* interval  = */ ctx.flush_interval,
1421                         /* user data = */ &(user_data_t) {
1422                                 .data = cb,
1423                                 .free_func = plugin_flush_timeout_callback_free,
1424                         });
1425
1426                 sfree (flush_name);
1427                 if (status != 0)
1428                 {
1429                         sfree (cb->name);
1430                         sfree (cb);
1431                         return status;
1432                 }
1433         }
1434
1435         return 0;
1436 } /* int plugin_register_flush */
1437
1438 int plugin_register_missing (const char *name,
1439                 plugin_missing_cb callback, user_data_t const *ud)
1440 {
1441         return (create_register_callback (&list_missing, name,
1442                                 (void *) callback, ud));
1443 } /* int plugin_register_missing */
1444
1445 int plugin_register_shutdown (const char *name,
1446                 int (*callback) (void))
1447 {
1448         return (create_register_callback (&list_shutdown, name,
1449                                 (void *) callback, /* user_data = */ NULL));
1450 } /* int plugin_register_shutdown */
1451
1452 static void plugin_free_data_sets (void)
1453 {
1454         void *key;
1455         void *value;
1456
1457         if (data_sets == NULL)
1458                 return;
1459
1460         while (c_avl_pick (data_sets, &key, &value) == 0)
1461         {
1462                 data_set_t *ds = value;
1463                 /* key is a pointer to ds->type */
1464
1465                 sfree (ds->ds);
1466                 sfree (ds);
1467         }
1468
1469         c_avl_destroy (data_sets);
1470         data_sets = NULL;
1471 } /* void plugin_free_data_sets */
1472
1473 int plugin_register_data_set (const data_set_t *ds)
1474 {
1475         data_set_t *ds_copy;
1476
1477         if ((data_sets != NULL)
1478                         && (c_avl_get (data_sets, ds->type, NULL) == 0))
1479         {
1480                 NOTICE ("Replacing DS `%s' with another version.", ds->type);
1481                 plugin_unregister_data_set (ds->type);
1482         }
1483         else if (data_sets == NULL)
1484         {
1485                 data_sets = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1486                 if (data_sets == NULL)
1487                         return (-1);
1488         }
1489
1490         ds_copy = malloc (sizeof (*ds_copy));
1491         if (ds_copy == NULL)
1492                 return (-1);
1493         memcpy(ds_copy, ds, sizeof (data_set_t));
1494
1495         ds_copy->ds = malloc (sizeof (*ds_copy->ds)
1496                         * ds->ds_num);
1497         if (ds_copy->ds == NULL)
1498         {
1499                 sfree (ds_copy);
1500                 return (-1);
1501         }
1502
1503         for (size_t i = 0; i < ds->ds_num; i++)
1504                 memcpy (ds_copy->ds + i, ds->ds + i, sizeof (data_source_t));
1505
1506         return (c_avl_insert (data_sets, (void *) ds_copy->type, (void *) ds_copy));
1507 } /* int plugin_register_data_set */
1508
1509 int plugin_register_log (const char *name,
1510                 plugin_log_cb callback, user_data_t const *ud)
1511 {
1512         return (create_register_callback (&list_log, name,
1513                                 (void *) callback, ud));
1514 } /* int plugin_register_log */
1515
1516 int plugin_register_notification (const char *name,
1517                 plugin_notification_cb callback, user_data_t const *ud)
1518 {
1519         return (create_register_callback (&list_notification, name,
1520                                 (void *) callback, ud));
1521 } /* int plugin_register_log */
1522
1523 int plugin_unregister_config (const char *name)
1524 {
1525         cf_unregister (name);
1526         return (0);
1527 } /* int plugin_unregister_config */
1528
1529 int plugin_unregister_complex_config (const char *name)
1530 {
1531         cf_unregister_complex (name);
1532         return (0);
1533 } /* int plugin_unregister_complex_config */
1534
1535 int plugin_unregister_init (const char *name)
1536 {
1537         return (plugin_unregister (list_init, name));
1538 }
1539
1540 int plugin_unregister_read (const char *name) /* {{{ */
1541 {
1542         llentry_t *le;
1543         read_func_t *rf;
1544
1545         if (name == NULL)
1546                 return (-ENOENT);
1547
1548         pthread_mutex_lock (&read_lock);
1549
1550         if (read_list == NULL)
1551         {
1552                 pthread_mutex_unlock (&read_lock);
1553                 return (-ENOENT);
1554         }
1555
1556         le = llist_search (read_list, name);
1557         if (le == NULL)
1558         {
1559                 pthread_mutex_unlock (&read_lock);
1560                 WARNING ("plugin_unregister_read: No such read function: %s",
1561                                 name);
1562                 return (-ENOENT);
1563         }
1564
1565         llist_remove (read_list, le);
1566
1567         rf = le->value;
1568         assert (rf != NULL);
1569         rf->rf_type = RF_REMOVE;
1570
1571         pthread_mutex_unlock (&read_lock);
1572
1573         llentry_destroy (le);
1574
1575         DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
1576
1577         return (0);
1578 } /* }}} int plugin_unregister_read */
1579
1580 void plugin_log_available_writers (void)
1581 {
1582         log_list_callbacks (&list_write, "Available write targets:");
1583 }
1584
1585 static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */
1586 {
1587         read_func_t *rf    = e->value;
1588         char        *group = ud;
1589
1590         return strcmp (rf->rf_group, (const char *)group);
1591 } /* }}} int compare_read_func_group */
1592
1593 int plugin_unregister_read_group (const char *group) /* {{{ */
1594 {
1595         llentry_t *le;
1596         read_func_t *rf;
1597
1598         int found = 0;
1599
1600         if (group == NULL)
1601                 return (-ENOENT);
1602
1603         pthread_mutex_lock (&read_lock);
1604
1605         if (read_list == NULL)
1606         {
1607                 pthread_mutex_unlock (&read_lock);
1608                 return (-ENOENT);
1609         }
1610
1611         while (42)
1612         {
1613                 le = llist_search_custom (read_list,
1614                                 compare_read_func_group, (void *)group);
1615
1616                 if (le == NULL)
1617                         break;
1618
1619                 ++found;
1620
1621                 llist_remove (read_list, le);
1622
1623                 rf = le->value;
1624                 assert (rf != NULL);
1625                 rf->rf_type = RF_REMOVE;
1626
1627                 llentry_destroy (le);
1628
1629                 DEBUG ("plugin_unregister_read_group: "
1630                                 "Marked `%s' (group `%s') for removal.",
1631                                 rf->rf_name, group);
1632         }
1633
1634         pthread_mutex_unlock (&read_lock);
1635
1636         if (found == 0)
1637         {
1638                 WARNING ("plugin_unregister_read_group: No such "
1639                                 "group of read function: %s", group);
1640                 return (-ENOENT);
1641         }
1642
1643         return (0);
1644 } /* }}} int plugin_unregister_read_group */
1645
1646 int plugin_unregister_write (const char *name)
1647 {
1648         return (plugin_unregister (list_write, name));
1649 }
1650
1651 int plugin_unregister_flush (const char *name)
1652 {
1653         plugin_ctx_t ctx = plugin_get_ctx ();
1654
1655         if (ctx.flush_interval != 0)
1656         {
1657                 char *flush_name;
1658
1659                 flush_name = plugin_flush_callback_name (name);
1660                 if (flush_name != NULL)
1661                 {
1662                         plugin_unregister_read(flush_name);
1663                         sfree (flush_name);
1664                 }
1665         }
1666
1667         return plugin_unregister (list_flush, name);
1668 }
1669
1670 int plugin_unregister_missing (const char *name)
1671 {
1672         return (plugin_unregister (list_missing, name));
1673 }
1674
1675 int plugin_unregister_shutdown (const char *name)
1676 {
1677         return (plugin_unregister (list_shutdown, name));
1678 }
1679
1680 int plugin_unregister_data_set (const char *name)
1681 {
1682         data_set_t *ds;
1683
1684         if (data_sets == NULL)
1685                 return (-1);
1686
1687         if (c_avl_remove (data_sets, name, NULL, (void *) &ds) != 0)
1688                 return (-1);
1689
1690         sfree (ds->ds);
1691         sfree (ds);
1692
1693         return (0);
1694 } /* int plugin_unregister_data_set */
1695
1696 int plugin_unregister_log (const char *name)
1697 {
1698         return (plugin_unregister (list_log, name));
1699 }
1700
1701 int plugin_unregister_notification (const char *name)
1702 {
1703         return (plugin_unregister (list_notification, name));
1704 }
1705
1706 int plugin_init_all (void)
1707 {
1708         char const *chain_name;
1709         llentry_t *le;
1710         int status;
1711         int ret = 0;
1712
1713         /* Init the value cache */
1714         uc_init ();
1715
1716         if (IS_TRUE (global_option_get ("CollectInternalStats")))
1717                 record_statistics = 1;
1718
1719         chain_name = global_option_get ("PreCacheChain");
1720         pre_cache_chain = fc_chain_get_by_name (chain_name);
1721
1722         chain_name = global_option_get ("PostCacheChain");
1723         post_cache_chain = fc_chain_get_by_name (chain_name);
1724
1725         write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
1726                         /* default = */ 0);
1727         if (write_limit_high < 0)
1728         {
1729                 ERROR ("WriteQueueLimitHigh must be positive or zero.");
1730                 write_limit_high = 0;
1731         }
1732
1733         write_limit_low = global_option_get_long ("WriteQueueLimitLow",
1734                         /* default = */ write_limit_high / 2);
1735         if (write_limit_low < 0)
1736         {
1737                 ERROR ("WriteQueueLimitLow must be positive or zero.");
1738                 write_limit_low = write_limit_high / 2;
1739         }
1740         else if (write_limit_low > write_limit_high)
1741         {
1742                 ERROR ("WriteQueueLimitLow must not be larger than "
1743                                 "WriteQueueLimitHigh.");
1744                 write_limit_low = write_limit_high;
1745         }
1746
1747         write_threads_num = global_option_get_long ("WriteThreads",
1748                         /* default = */ 5);
1749         if (write_threads_num < 1)
1750         {
1751                 ERROR ("WriteThreads must be positive.");
1752                 write_threads_num = 5;
1753         }
1754
1755         if ((list_init == NULL) && (read_heap == NULL))
1756                 return ret;
1757
1758         /* Calling all init callbacks before checking if read callbacks
1759          * are available allows the init callbacks to register the read
1760          * callback. */
1761         le = llist_head (list_init);
1762         while (le != NULL)
1763         {
1764                 callback_func_t *cf;
1765                 plugin_init_cb callback;
1766                 plugin_ctx_t old_ctx;
1767
1768                 cf = le->value;
1769                 old_ctx = plugin_set_ctx (cf->cf_ctx);
1770                 callback = cf->cf_callback;
1771                 status = (*callback) ();
1772                 plugin_set_ctx (old_ctx);
1773
1774                 if (status != 0)
1775                 {
1776                         ERROR ("Initialization of plugin `%s' "
1777                                         "failed with status %i. "
1778                                         "Plugin will be unloaded.",
1779                                         le->key, status);
1780                         /* Plugins that register read callbacks from the init
1781                          * callback should take care of appropriate error
1782                          * handling themselves. */
1783                         /* FIXME: Unload _all_ functions */
1784                         plugin_unregister_read (le->key);
1785                         ret = -1;
1786                 }
1787
1788                 le = le->next;
1789         }
1790
1791         start_write_threads ((size_t) write_threads_num);
1792
1793         max_read_interval = global_option_get_time ("MaxReadInterval",
1794                         DEFAULT_MAX_READ_INTERVAL);
1795
1796         /* Start read-threads */
1797         if (read_heap != NULL)
1798         {
1799                 const char *rt;
1800                 int num;
1801
1802                 rt = global_option_get ("ReadThreads");
1803                 num = atoi (rt);
1804                 if (num != -1)
1805                         start_read_threads ((num > 0) ? num : 5);
1806         }
1807         return ret;
1808 } /* void plugin_init_all */
1809
1810 /* TODO: Rename this function. */
1811 void plugin_read_all (void)
1812 {
1813         if(record_statistics) {
1814                 plugin_update_internal_statistics ();
1815         }
1816         uc_check_timeout ();
1817
1818         return;
1819 } /* void plugin_read_all */
1820
1821 /* Read function called when the `-T' command line argument is given. */
1822 int plugin_read_all_once (void)
1823 {
1824         int status;
1825         int return_status = 0;
1826
1827         if (read_heap == NULL)
1828         {
1829                 NOTICE ("No read-functions are registered.");
1830                 return (0);
1831         }
1832
1833         while (42)
1834         {
1835                 read_func_t *rf;
1836                 plugin_ctx_t old_ctx;
1837
1838                 rf = c_heap_get_root (read_heap);
1839                 if (rf == NULL)
1840                         break;
1841
1842                 old_ctx = plugin_set_ctx (rf->rf_ctx);
1843
1844                 if (rf->rf_type == RF_SIMPLE)
1845                 {
1846                         int (*callback) (void);
1847
1848                         callback = rf->rf_callback;
1849                         status = (*callback) ();
1850                 }
1851                 else
1852                 {
1853                         plugin_read_cb callback;
1854
1855                         callback = rf->rf_callback;
1856                         status = (*callback) (&rf->rf_udata);
1857                 }
1858
1859                 plugin_set_ctx (old_ctx);
1860
1861                 if (status != 0)
1862                 {
1863                         NOTICE ("read-function of plugin `%s' failed.",
1864                                         rf->rf_name);
1865                         return_status = -1;
1866                 }
1867
1868                 sfree (rf->rf_name);
1869                 destroy_callback ((void *) rf);
1870         }
1871
1872         return (return_status);
1873 } /* int plugin_read_all_once */
1874
1875 int plugin_write (const char *plugin, /* {{{ */
1876                 const data_set_t *ds, const value_list_t *vl)
1877 {
1878   llentry_t *le;
1879   int status;
1880
1881   if (vl == NULL)
1882     return (EINVAL);
1883
1884   if (list_write == NULL)
1885     return (ENOENT);
1886
1887   if (ds == NULL)
1888   {
1889     ds = plugin_get_ds (vl->type);
1890     if (ds == NULL)
1891     {
1892       ERROR ("plugin_write: Unable to lookup type `%s'.", vl->type);
1893       return (ENOENT);
1894     }
1895   }
1896
1897   if (plugin == NULL)
1898   {
1899     int success = 0;
1900     int failure = 0;
1901
1902     le = llist_head (list_write);
1903     while (le != NULL)
1904     {
1905       callback_func_t *cf = le->value;
1906       plugin_write_cb callback;
1907
1908       /* do not switch plugin context; rather keep the context (interval)
1909        * information of the calling read plugin */
1910
1911       DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
1912       callback = cf->cf_callback;
1913       status = (*callback) (ds, vl, &cf->cf_udata);
1914       if (status != 0)
1915         failure++;
1916       else
1917         success++;
1918
1919       le = le->next;
1920     }
1921
1922     if ((success == 0) && (failure != 0))
1923       status = -1;
1924     else
1925       status = 0;
1926   }
1927   else /* plugin != NULL */
1928   {
1929     callback_func_t *cf;
1930     plugin_write_cb callback;
1931
1932     le = llist_head (list_write);
1933     while (le != NULL)
1934     {
1935       if (strcasecmp (plugin, le->key) == 0)
1936         break;
1937
1938       le = le->next;
1939     }
1940
1941     if (le == NULL)
1942       return (ENOENT);
1943
1944     cf = le->value;
1945
1946     /* do not switch plugin context; rather keep the context (interval)
1947      * information of the calling read plugin */
1948
1949     DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
1950     callback = cf->cf_callback;
1951     status = (*callback) (ds, vl, &cf->cf_udata);
1952   }
1953
1954   return (status);
1955 } /* }}} int plugin_write */
1956
1957 int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
1958 {
1959   llentry_t *le;
1960
1961   if (list_flush == NULL)
1962     return (0);
1963
1964   le = llist_head (list_flush);
1965   while (le != NULL)
1966   {
1967     callback_func_t *cf;
1968     plugin_flush_cb callback;
1969     plugin_ctx_t old_ctx;
1970
1971     if ((plugin != NULL)
1972         && (strcmp (plugin, le->key) != 0))
1973     {
1974       le = le->next;
1975       continue;
1976     }
1977
1978     cf = le->value;
1979     old_ctx = plugin_set_ctx (cf->cf_ctx);
1980     callback = cf->cf_callback;
1981
1982     (*callback) (timeout, identifier, &cf->cf_udata);
1983
1984     plugin_set_ctx (old_ctx);
1985
1986     le = le->next;
1987   }
1988   return (0);
1989 } /* int plugin_flush */
1990
1991 int plugin_shutdown_all (void)
1992 {
1993         llentry_t *le;
1994         int ret = 0;  // Assume success.
1995
1996         destroy_all_callbacks (&list_init);
1997
1998         stop_read_threads ();
1999
2000         pthread_mutex_lock (&read_lock);
2001         llist_destroy (read_list);
2002         read_list = NULL;
2003         pthread_mutex_unlock (&read_lock);
2004
2005         destroy_read_heap ();
2006
2007         /* blocks until all write threads have shut down. */
2008         stop_write_threads ();
2009
2010         /* ask all plugins to write out the state they kept. */
2011         plugin_flush (/* plugin = */ NULL,
2012                         /* timeout = */ 0,
2013                         /* identifier = */ NULL);
2014
2015         le = NULL;
2016         if (list_shutdown != NULL)
2017                 le = llist_head (list_shutdown);
2018
2019         while (le != NULL)
2020         {
2021                 callback_func_t *cf;
2022                 plugin_shutdown_cb callback;
2023                 plugin_ctx_t old_ctx;
2024
2025                 cf = le->value;
2026                 old_ctx = plugin_set_ctx (cf->cf_ctx);
2027                 callback = cf->cf_callback;
2028
2029                 /* Advance the pointer before calling the callback allows
2030                  * shutdown functions to unregister themselves. If done the
2031                  * other way around the memory `le' points to will be freed
2032                  * after callback returns. */
2033                 le = le->next;
2034
2035                 if ((*callback) () != 0)
2036                         ret = -1;
2037
2038                 plugin_set_ctx (old_ctx);
2039         }
2040
2041         /* Write plugins which use the `user_data' pointer usually need the
2042          * same data available to the flush callback. If this is the case, set
2043          * the free_function to NULL when registering the flush callback and to
2044          * the real free function when registering the write callback. This way
2045          * the data isn't freed twice. */
2046         destroy_all_callbacks (&list_flush);
2047         destroy_all_callbacks (&list_missing);
2048         destroy_all_callbacks (&list_write);
2049
2050         destroy_all_callbacks (&list_notification);
2051         destroy_all_callbacks (&list_shutdown);
2052         destroy_all_callbacks (&list_log);
2053
2054         plugin_free_loaded ();
2055         plugin_free_data_sets ();
2056         return (ret);
2057 } /* void plugin_shutdown_all */
2058
2059 int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
2060 {
2061   llentry_t *le;
2062
2063   if (list_missing == NULL)
2064     return (0);
2065
2066   le = llist_head (list_missing);
2067   while (le != NULL)
2068   {
2069     callback_func_t *cf;
2070     plugin_missing_cb callback;
2071     plugin_ctx_t old_ctx;
2072     int status;
2073
2074     cf = le->value;
2075     old_ctx = plugin_set_ctx (cf->cf_ctx);
2076     callback = cf->cf_callback;
2077
2078     status = (*callback) (vl, &cf->cf_udata);
2079     plugin_set_ctx (old_ctx);
2080     if (status != 0)
2081     {
2082       if (status < 0)
2083       {
2084         ERROR ("plugin_dispatch_missing: Callback function \"%s\" "
2085             "failed with status %i.",
2086             le->key, status);
2087         return (status);
2088       }
2089       else
2090       {
2091         return (0);
2092       }
2093     }
2094
2095     le = le->next;
2096   }
2097   return (0);
2098 } /* int }}} plugin_dispatch_missing */
2099
2100 static int plugin_dispatch_values_internal (value_list_t *vl)
2101 {
2102         int status;
2103         static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
2104
2105         data_set_t *ds;
2106
2107         _Bool free_meta_data = 0;
2108
2109         assert (vl != NULL);
2110
2111         /* These fields are initialized by plugin_value_list_clone() if needed: */
2112         assert (vl->host[0] != 0);
2113         assert (vl->time != 0); /* The time is determined at _enqueue_ time. */
2114         assert (vl->interval != 0);
2115
2116         if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1)
2117         {
2118                 ERROR ("plugin_dispatch_values: Invalid value list "
2119                                 "from plugin %s.", vl->plugin);
2120                 return (-1);
2121         }
2122
2123         /* Free meta data only if the calling function didn't specify any. In
2124          * this case matches and targets may add some and the calling function
2125          * may not expect (and therefore free) that data. */
2126         if (vl->meta == NULL)
2127                 free_meta_data = 1;
2128
2129         if (list_write == NULL)
2130                 c_complain_once (LOG_WARNING, &no_write_complaint,
2131                                 "plugin_dispatch_values: No write callback has been "
2132                                 "registered. Please load at least one output plugin, "
2133                                 "if you want the collected data to be stored.");
2134
2135         if (data_sets == NULL)
2136         {
2137                 ERROR ("plugin_dispatch_values: No data sets registered. "
2138                                 "Could the types database be read? Check "
2139                                 "your `TypesDB' setting!");
2140                 return (-1);
2141         }
2142
2143         if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0)
2144         {
2145                 char ident[6 * DATA_MAX_NAME_LEN];
2146
2147                 FORMAT_VL (ident, sizeof (ident), vl);
2148                 INFO ("plugin_dispatch_values: Dataset not found: %s "
2149                                 "(from \"%s\"), check your types.db!",
2150                                 vl->type, ident);
2151                 return (-1);
2152         }
2153
2154         DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
2155                         "host = %s; "
2156                         "plugin = %s; plugin_instance = %s; "
2157                         "type = %s; type_instance = %s;",
2158                         CDTIME_T_TO_DOUBLE (vl->time),
2159                         CDTIME_T_TO_DOUBLE (vl->interval),
2160                         vl->host,
2161                         vl->plugin, vl->plugin_instance,
2162                         vl->type, vl->type_instance);
2163
2164 #if COLLECT_DEBUG
2165         assert (0 == strcmp (ds->type, vl->type));
2166 #else
2167         if (0 != strcmp (ds->type, vl->type))
2168                 WARNING ("plugin_dispatch_values: (ds->type = %s) != (vl->type = %s)",
2169                                 ds->type, vl->type);
2170 #endif
2171
2172 #if COLLECT_DEBUG
2173         assert (ds->ds_num == vl->values_len);
2174 #else
2175         if (ds->ds_num != vl->values_len)
2176         {
2177                 ERROR ("plugin_dispatch_values: ds->type = %s: "
2178                                 "(ds->ds_num = %zu) != "
2179                                 "(vl->values_len = %zu)",
2180                                 ds->type, ds->ds_num, vl->values_len);
2181                 return (-1);
2182         }
2183 #endif
2184
2185         escape_slashes (vl->host, sizeof (vl->host));
2186         escape_slashes (vl->plugin, sizeof (vl->plugin));
2187         escape_slashes (vl->plugin_instance, sizeof (vl->plugin_instance));
2188         escape_slashes (vl->type, sizeof (vl->type));
2189         escape_slashes (vl->type_instance, sizeof (vl->type_instance));
2190
2191         if (pre_cache_chain != NULL)
2192         {
2193                 status = fc_process_chain (ds, vl, pre_cache_chain);
2194                 if (status < 0)
2195                 {
2196                         WARNING ("plugin_dispatch_values: Running the "
2197                                         "pre-cache chain failed with "
2198                                         "status %i (%#x).",
2199                                         status, status);
2200                 }
2201                 else if (status == FC_TARGET_STOP)
2202                         return (0);
2203         }
2204
2205         /* Update the value cache */
2206         uc_update (ds, vl);
2207
2208         if (post_cache_chain != NULL)
2209         {
2210                 status = fc_process_chain (ds, vl, post_cache_chain);
2211                 if (status < 0)
2212                 {
2213                         WARNING ("plugin_dispatch_values: Running the "
2214                                         "post-cache chain failed with "
2215                                         "status %i (%#x).",
2216                                         status, status);
2217                 }
2218         }
2219         else
2220                 fc_default_action (ds, vl);
2221
2222         if ((free_meta_data != 0) && (vl->meta != NULL))
2223         {
2224                 meta_data_destroy (vl->meta);
2225                 vl->meta = NULL;
2226         }
2227
2228         return (0);
2229 } /* int plugin_dispatch_values_internal */
2230
2231 static double get_drop_probability (void) /* {{{ */
2232 {
2233         long pos;
2234         long size;
2235         long wql;
2236
2237         pthread_mutex_lock (&write_lock);
2238         wql = write_queue_length;
2239         pthread_mutex_unlock (&write_lock);
2240
2241         if (wql < write_limit_low)
2242                 return (0.0);
2243         if (wql >= write_limit_high)
2244                 return (1.0);
2245
2246         pos = 1 + wql - write_limit_low;
2247         size = 1 + write_limit_high - write_limit_low;
2248
2249         return (((double) pos) / ((double) size));
2250 } /* }}} double get_drop_probability */
2251
2252 static _Bool check_drop_value (void) /* {{{ */
2253 {
2254         static cdtime_t last_message_time = 0;
2255         static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2256
2257         double p;
2258         double q;
2259         int status;
2260
2261         if (write_limit_high == 0)
2262                 return (0);
2263
2264         p = get_drop_probability ();
2265         if (p == 0.0)
2266                 return (0);
2267
2268         status = pthread_mutex_trylock (&last_message_lock);
2269         if (status == 0)
2270         {
2271                 cdtime_t now;
2272
2273                 now = cdtime ();
2274                 if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
2275                 {
2276                         last_message_time = now;
2277                         ERROR ("plugin_dispatch_values: Low water mark "
2278                                         "reached. Dropping %.0f%% of metrics.",
2279                                         100.0 * p);
2280                 }
2281                 pthread_mutex_unlock (&last_message_lock);
2282         }
2283
2284         if (p == 1.0)
2285                 return (1);
2286
2287         q = cdrand_d ();
2288         if (q > p)
2289                 return (1);
2290         else
2291                 return (0);
2292 } /* }}} _Bool check_drop_value */
2293
2294 int plugin_dispatch_values (value_list_t const *vl)
2295 {
2296         int status;
2297         static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
2298
2299         if (check_drop_value ()) {
2300                 if(record_statistics) {
2301                         pthread_mutex_lock(&statistics_lock);
2302                         stats_values_dropped++;
2303                         pthread_mutex_unlock(&statistics_lock);
2304                 }
2305                 return (0);
2306         }
2307
2308         status = plugin_write_enqueue (vl);
2309         if (status != 0)
2310         {
2311                 char errbuf[1024];
2312                 ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
2313                                 "with status %i (%s).", status,
2314                                 sstrerror (status, errbuf, sizeof (errbuf)));
2315                 return (status);
2316         }
2317
2318         return (0);
2319 }
2320
2321 __attribute__((sentinel))
2322 int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */
2323                 _Bool store_percentage, int store_type, ...)
2324 {
2325         value_list_t *vl;
2326         int failed = 0;
2327         gauge_t sum = 0.0;
2328         va_list ap;
2329
2330         assert (template->values_len == 1);
2331
2332         /* Calculate sum for Gauge to calculate percent if needed */
2333         if (DS_TYPE_GAUGE == store_type)        {
2334                 va_start (ap, store_type);
2335                 while (42)
2336                 {
2337                         char const *name;
2338                         gauge_t value;
2339
2340                         name = va_arg (ap, char const *);
2341                         if (name == NULL)
2342                                 break;
2343
2344                         value = va_arg (ap, gauge_t);
2345                         if (!isnan (value))
2346                                 sum += value;
2347                 }
2348                 va_end (ap);
2349         }
2350
2351
2352         vl = plugin_value_list_clone (template);
2353         /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2354         if (store_percentage)
2355                 sstrncpy (vl->type, "percent", sizeof (vl->type));
2356
2357         va_start (ap, store_type);
2358         while (42)
2359         {
2360                 char const *name;
2361                 int status;
2362
2363                 /* Set the type instance. */
2364                 name = va_arg (ap, char const *);
2365                 if (name == NULL)
2366                         break;
2367                 sstrncpy (vl->type_instance, name, sizeof (vl->type_instance));
2368
2369                 /* Set the value. */
2370                 switch (store_type)
2371                 {
2372                 case DS_TYPE_GAUGE:
2373                         vl->values[0].gauge = va_arg (ap, gauge_t);
2374                         if (store_percentage)
2375                                 vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2376                         break;
2377                 case DS_TYPE_ABSOLUTE:
2378                         vl->values[0].absolute = va_arg (ap, absolute_t);
2379                         break;
2380                 case DS_TYPE_COUNTER:
2381                         vl->values[0].counter  = va_arg (ap, counter_t);
2382                         break;
2383                 case DS_TYPE_DERIVE:
2384                         vl->values[0].derive   = va_arg (ap, derive_t);
2385                         break;
2386                 default:
2387                         ERROR ("plugin_dispatch_multivalue: given store_type is incorrect.");
2388                         failed++;
2389                 }
2390
2391
2392                 status = plugin_write_enqueue (vl);
2393                 if (status != 0)
2394                         failed++;
2395         }
2396         va_end (ap);
2397
2398         plugin_value_list_free (vl);
2399         return (failed);
2400 } /* }}} int plugin_dispatch_multivalue */
2401
2402 int plugin_dispatch_notification (const notification_t *notif)
2403 {
2404         llentry_t *le;
2405         /* Possible TODO: Add flap detection here */
2406
2407         DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
2408                         "time = %.3f; host = %s;",
2409                         notif->severity, notif->message,
2410                         CDTIME_T_TO_DOUBLE (notif->time), notif->host);
2411
2412         /* Nobody cares for notifications */
2413         if (list_notification == NULL)
2414                 return (-1);
2415
2416         le = llist_head (list_notification);
2417         while (le != NULL)
2418         {
2419                 callback_func_t *cf;
2420                 plugin_notification_cb callback;
2421                 int status;
2422
2423                 /* do not switch plugin context; rather keep the context
2424                  * (interval) information of the calling plugin */
2425
2426                 cf = le->value;
2427                 callback = cf->cf_callback;
2428                 status = (*callback) (notif, &cf->cf_udata);
2429                 if (status != 0)
2430                 {
2431                         WARNING ("plugin_dispatch_notification: Notification "
2432                                         "callback %s returned %i.",
2433                                         le->key, status);
2434                 }
2435
2436                 le = le->next;
2437         }
2438
2439         return (0);
2440 } /* int plugin_dispatch_notification */
2441
2442 void plugin_log (int level, const char *format, ...)
2443 {
2444         char msg[1024];
2445         va_list ap;
2446         llentry_t *le;
2447
2448 #if !COLLECT_DEBUG
2449         if (level >= LOG_DEBUG)
2450                 return;
2451 #endif
2452
2453         va_start (ap, format);
2454         vsnprintf (msg, sizeof (msg), format, ap);
2455         msg[sizeof (msg) - 1] = '\0';
2456         va_end (ap);
2457
2458         if (list_log == NULL)
2459         {
2460                 fprintf (stderr, "%s\n", msg);
2461                 return;
2462         }
2463
2464         le = llist_head (list_log);
2465         while (le != NULL)
2466         {
2467                 callback_func_t *cf;
2468                 plugin_log_cb callback;
2469
2470                 cf = le->value;
2471                 callback = cf->cf_callback;
2472
2473                 /* do not switch plugin context; rather keep the context
2474                  * (interval) information of the calling plugin */
2475
2476                 (*callback) (level, msg, &cf->cf_udata);
2477
2478                 le = le->next;
2479         }
2480 } /* void plugin_log */
2481
2482 int parse_log_severity (const char *severity)
2483 {
2484         int log_level = -1;
2485
2486         if ((0 == strcasecmp (severity, "emerg"))
2487                         || (0 == strcasecmp (severity, "alert"))
2488                         || (0 == strcasecmp (severity, "crit"))
2489                         || (0 == strcasecmp (severity, "err")))
2490                 log_level = LOG_ERR;
2491         else if (0 == strcasecmp (severity, "warning"))
2492                 log_level = LOG_WARNING;
2493         else if (0 == strcasecmp (severity, "notice"))
2494                 log_level = LOG_NOTICE;
2495         else if (0 == strcasecmp (severity, "info"))
2496                 log_level = LOG_INFO;
2497 #if COLLECT_DEBUG
2498         else if (0 == strcasecmp (severity, "debug"))
2499                 log_level = LOG_DEBUG;
2500 #endif /* COLLECT_DEBUG */
2501
2502         return (log_level);
2503 } /* int parse_log_severity */
2504
2505 int parse_notif_severity (const char *severity)
2506 {
2507         int notif_severity = -1;
2508
2509         if (strcasecmp (severity, "FAILURE") == 0)
2510                 notif_severity = NOTIF_FAILURE;
2511         else if (strcmp (severity, "OKAY") == 0)
2512                 notif_severity = NOTIF_OKAY;
2513         else if ((strcmp (severity, "WARNING") == 0)
2514                         || (strcmp (severity, "WARN") == 0))
2515                 notif_severity = NOTIF_WARNING;
2516
2517         return (notif_severity);
2518 } /* int parse_notif_severity */
2519
2520 const data_set_t *plugin_get_ds (const char *name)
2521 {
2522         data_set_t *ds;
2523
2524         if (data_sets == NULL)
2525         {
2526                 ERROR ("plugin_get_ds: No data sets are defined yet.");
2527                 return (NULL);
2528         }
2529
2530         if (c_avl_get (data_sets, name, (void *) &ds) != 0)
2531         {
2532                 DEBUG ("No such dataset registered: %s", name);
2533                 return (NULL);
2534         }
2535
2536         return (ds);
2537 } /* data_set_t *plugin_get_ds */
2538
2539 static int plugin_notification_meta_add (notification_t *n,
2540     const char *name,
2541     enum notification_meta_type_e type,
2542     const void *value)
2543 {
2544   notification_meta_t *meta;
2545   notification_meta_t *tail;
2546
2547   if ((n == NULL) || (name == NULL) || (value == NULL))
2548   {
2549     ERROR ("plugin_notification_meta_add: A pointer is NULL!");
2550     return (-1);
2551   }
2552
2553   meta = calloc (1, sizeof (*meta));
2554   if (meta == NULL)
2555   {
2556     ERROR ("plugin_notification_meta_add: calloc failed.");
2557     return (-1);
2558   }
2559
2560   sstrncpy (meta->name, name, sizeof (meta->name));
2561   meta->type = type;
2562
2563   switch (type)
2564   {
2565     case NM_TYPE_STRING:
2566     {
2567       meta->nm_value.nm_string = strdup ((const char *) value);
2568       if (meta->nm_value.nm_string == NULL)
2569       {
2570         ERROR ("plugin_notification_meta_add: strdup failed.");
2571         sfree (meta);
2572         return (-1);
2573       }
2574       break;
2575     }
2576     case NM_TYPE_SIGNED_INT:
2577     {
2578       meta->nm_value.nm_signed_int = *((int64_t *) value);
2579       break;
2580     }
2581     case NM_TYPE_UNSIGNED_INT:
2582     {
2583       meta->nm_value.nm_unsigned_int = *((uint64_t *) value);
2584       break;
2585     }
2586     case NM_TYPE_DOUBLE:
2587     {
2588       meta->nm_value.nm_double = *((double *) value);
2589       break;
2590     }
2591     case NM_TYPE_BOOLEAN:
2592     {
2593       meta->nm_value.nm_boolean = *((_Bool *) value);
2594       break;
2595     }
2596     default:
2597     {
2598       ERROR ("plugin_notification_meta_add: Unknown type: %i", type);
2599       sfree (meta);
2600       return (-1);
2601     }
2602   } /* switch (type) */
2603
2604   meta->next = NULL;
2605   tail = n->meta;
2606   while ((tail != NULL) && (tail->next != NULL))
2607     tail = tail->next;
2608
2609   if (tail == NULL)
2610     n->meta = meta;
2611   else
2612     tail->next = meta;
2613
2614   return (0);
2615 } /* int plugin_notification_meta_add */
2616
2617 int plugin_notification_meta_add_string (notification_t *n,
2618     const char *name,
2619     const char *value)
2620 {
2621   return (plugin_notification_meta_add (n, name, NM_TYPE_STRING, value));
2622 }
2623
2624 int plugin_notification_meta_add_signed_int (notification_t *n,
2625     const char *name,
2626     int64_t value)
2627 {
2628   return (plugin_notification_meta_add (n, name, NM_TYPE_SIGNED_INT, &value));
2629 }
2630
2631 int plugin_notification_meta_add_unsigned_int (notification_t *n,
2632     const char *name,
2633     uint64_t value)
2634 {
2635   return (plugin_notification_meta_add (n, name, NM_TYPE_UNSIGNED_INT, &value));
2636 }
2637
2638 int plugin_notification_meta_add_double (notification_t *n,
2639     const char *name,
2640     double value)
2641 {
2642   return (plugin_notification_meta_add (n, name, NM_TYPE_DOUBLE, &value));
2643 }
2644
2645 int plugin_notification_meta_add_boolean (notification_t *n,
2646     const char *name,
2647     _Bool value)
2648 {
2649   return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
2650 }
2651
2652 int plugin_notification_meta_copy (notification_t *dst,
2653     const notification_t *src)
2654 {
2655   assert (dst != NULL);
2656   assert (src != NULL);
2657   assert (dst != src);
2658   assert ((src->meta == NULL) || (src->meta != dst->meta));
2659
2660   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next)
2661   {
2662     if (meta->type == NM_TYPE_STRING)
2663       plugin_notification_meta_add_string (dst, meta->name,
2664           meta->nm_value.nm_string);
2665     else if (meta->type == NM_TYPE_SIGNED_INT)
2666       plugin_notification_meta_add_signed_int (dst, meta->name,
2667           meta->nm_value.nm_signed_int);
2668     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2669       plugin_notification_meta_add_unsigned_int (dst, meta->name,
2670           meta->nm_value.nm_unsigned_int);
2671     else if (meta->type == NM_TYPE_DOUBLE)
2672       plugin_notification_meta_add_double (dst, meta->name,
2673           meta->nm_value.nm_double);
2674     else if (meta->type == NM_TYPE_BOOLEAN)
2675       plugin_notification_meta_add_boolean (dst, meta->name,
2676           meta->nm_value.nm_boolean);
2677   }
2678
2679   return (0);
2680 } /* int plugin_notification_meta_copy */
2681
2682 int plugin_notification_meta_free (notification_meta_t *n)
2683 {
2684   notification_meta_t *this;
2685   notification_meta_t *next;
2686
2687   if (n == NULL)
2688   {
2689     ERROR ("plugin_notification_meta_free: n == NULL!");
2690     return (-1);
2691   }
2692
2693   this = n;
2694   while (this != NULL)
2695   {
2696     next = this->next;
2697
2698     if (this->type == NM_TYPE_STRING)
2699     {
2700       /* Assign to a temporary variable to work around nm_string's const
2701        * modifier. */
2702       void *tmp = (void *) this->nm_value.nm_string;
2703
2704       sfree (tmp);
2705       this->nm_value.nm_string = NULL;
2706     }
2707     sfree (this);
2708
2709     this = next;
2710   }
2711
2712   return (0);
2713 } /* int plugin_notification_meta_free */
2714
2715 static void plugin_ctx_destructor (void *ctx)
2716 {
2717         sfree (ctx);
2718 } /* void plugin_ctx_destructor */
2719
2720 static plugin_ctx_t ctx_init = { /* interval = */ 0 };
2721
2722 static plugin_ctx_t *plugin_ctx_create (void)
2723 {
2724         plugin_ctx_t *ctx;
2725
2726         ctx = malloc (sizeof (*ctx));
2727         if (ctx == NULL) {
2728                 char errbuf[1024];
2729                 ERROR ("Failed to allocate plugin context: %s",
2730                                 sstrerror (errno, errbuf, sizeof (errbuf)));
2731                 return NULL;
2732         }
2733
2734         *ctx = ctx_init;
2735         assert (plugin_ctx_key_initialized);
2736         pthread_setspecific (plugin_ctx_key, ctx);
2737         DEBUG("Created new plugin context.");
2738         return (ctx);
2739 } /* int plugin_ctx_create */
2740
2741 void plugin_init_ctx (void)
2742 {
2743         pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
2744         plugin_ctx_key_initialized = 1;
2745 } /* void plugin_init_ctx */
2746
2747 plugin_ctx_t plugin_get_ctx (void)
2748 {
2749         plugin_ctx_t *ctx;
2750
2751         assert (plugin_ctx_key_initialized);
2752         ctx = pthread_getspecific (plugin_ctx_key);
2753
2754         if (ctx == NULL) {
2755                 ctx = plugin_ctx_create ();
2756                 /* this must no happen -- exit() instead? */
2757                 if (ctx == NULL)
2758                         return ctx_init;
2759         }
2760
2761         return (*ctx);
2762 } /* plugin_ctx_t plugin_get_ctx */
2763
2764 plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
2765 {
2766         plugin_ctx_t *c;
2767         plugin_ctx_t old;
2768
2769         assert (plugin_ctx_key_initialized);
2770         c = pthread_getspecific (plugin_ctx_key);
2771
2772         if (c == NULL) {
2773                 c = plugin_ctx_create ();
2774                 /* this must no happen -- exit() instead? */
2775                 if (c == NULL)
2776                         return ctx_init;
2777         }
2778
2779         old = *c;
2780         *c = ctx;
2781
2782         return (old);
2783 } /* void plugin_set_ctx */
2784
2785 cdtime_t plugin_get_interval (void)
2786 {
2787         cdtime_t interval;
2788
2789         interval = plugin_get_ctx().interval;
2790         if (interval > 0)
2791                 return interval;
2792
2793         return cf_get_default_interval ();
2794 } /* cdtime_t plugin_get_interval */
2795
2796 typedef struct {
2797         plugin_ctx_t ctx;
2798         void *(*start_routine) (void *);
2799         void *arg;
2800 } plugin_thread_t;
2801
2802 static void *plugin_thread_start (void *arg)
2803 {
2804         plugin_thread_t *plugin_thread = arg;
2805
2806         void *(*start_routine) (void *) = plugin_thread->start_routine;
2807         void *plugin_arg = plugin_thread->arg;
2808
2809         plugin_set_ctx (plugin_thread->ctx);
2810
2811         sfree (plugin_thread);
2812
2813         return start_routine (plugin_arg);
2814 } /* void *plugin_thread_start */
2815
2816 int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
2817                 void *(*start_routine) (void *), void *arg, char *name)
2818 {
2819         plugin_thread_t *plugin_thread;
2820         int ret;
2821
2822         plugin_thread = malloc (sizeof (*plugin_thread));
2823         if (plugin_thread == NULL)
2824                 return -1;
2825
2826         plugin_thread->ctx           = plugin_get_ctx ();
2827         plugin_thread->start_routine = start_routine;
2828         plugin_thread->arg           = arg;
2829
2830         ret = pthread_create (thread, attr,
2831                         plugin_thread_start, plugin_thread);
2832
2833         if (ret == 0 && name != NULL) {
2834 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
2835                 char thread_name[16];
2836                 sstrncpy (thread_name, name, sizeof(thread_name));
2837 # if defined(HAVE_PTHREAD_SETNAME_NP)
2838                 pthread_setname_np (*thread, thread_name);
2839 # elif defined(HAVE_PTHREAD_SET_NAME_NP)
2840                 pthread_set_name_np (*thread, thread_name);
2841 # endif
2842 #endif
2843         }
2844
2845         return ret;
2846 } /* int plugin_thread_create */
2847
2848 /* vim: set sw=8 ts=8 noet fdm=marker : */