Merge branch 'master' of octo@verplant.org:/var/lib/git/collectd
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 27 Feb 2008 21:50:22 +0000 (22:50 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 27 Feb 2008 21:50:22 +0000 (22:50 +0100)
23 files changed:
src/Makefile.am
src/collectd-unixsock.pod
src/collectd.c
src/exec.c
src/hddtemp.c
src/network.c
src/perl.c
src/ping.c
src/plugin.c
src/plugin.h
src/rrdtool.c
src/tcpconns.c
src/unixsock.c
src/utils_cache.c
src/utils_cache.h
src/utils_cmd_flush.c [new file with mode: 0644]
src/utils_cmd_flush.h [new file with mode: 0644]
src/utils_cmd_getval.c [new file with mode: 0644]
src/utils_cmd_getval.h [new file with mode: 0644]
src/utils_cmd_putnotif.h
src/utils_cmd_putval.h
src/utils_match.c
src/utils_match.h

index c3d0b1d..194e118 100644 (file)
@@ -219,7 +219,9 @@ endif
 
 if BUILD_PLUGIN_EXEC
 pkglib_LTLIBRARIES += exec.la
-exec_la_SOURCES = exec.c utils_cmd_putval.c utils_cmd_putval.h
+exec_la_SOURCES = exec.c \
+                 utils_cmd_putnotif.c utils_cmd_putnotif.h \
+                 utils_cmd_putval.c utils_cmd_putval.h
 exec_la_LDFLAGS = -module -avoid-version
 if BUILD_WITH_LIBPTHREAD
 exec_la_LDFLAGS += -lpthread
@@ -592,7 +594,11 @@ endif
 
 if BUILD_PLUGIN_UNIXSOCK
 pkglib_LTLIBRARIES += unixsock.la
-unixsock_la_SOURCES = unixsock.c utils_cmd_putval.h utils_cmd_putval.c utils_cmd_putnotif.h utils_cmd_putnotif.c
+unixsock_la_SOURCES = unixsock.c \
+                     utils_cmd_flush.h utils_cmd_flush.c \
+                     utils_cmd_getval.h utils_cmd_getval.c \
+                     utils_cmd_putval.h utils_cmd_putval.c \
+                     utils_cmd_putnotif.h utils_cmd_putnotif.c
 unixsock_la_LDFLAGS = -module -avoid-version -lpthread
 collectd_LDADD += "-dlopen" unixsock.la
 collectd_DEPENDENCIES += unixsock.la
index d17852a..a08a75d 100644 (file)
@@ -174,6 +174,15 @@ Example:
   -> | PUTNOTIF type=temperature severity=warning time=1201094702 message=The roof is on fire!
   <- | 0 Success
 
+=item B<FLUSH> [I<Timeout>]
+
+Flushes all cached data older than I<Timeout> seconds. If no timeout has been
+specified, it defaults to -1 which causes all data to be flushed.
+
+Example:
+  -> | FLUSH
+  <- | 0 Done
+
 =back
 
 =head2 Identifiers
index 984ff75..d2ca568 100644 (file)
@@ -27,6 +27,8 @@
 #include <sys/socket.h>
 #include <netdb.h>
 
+#include <pthread.h>
+
 #include "plugin.h"
 #include "configfile.h"
 
@@ -41,16 +43,37 @@ kstat_ctl_t *kc;
 
 static int loop = 0;
 
-static void sigIntHandler (int signal)
+static void *do_flush (void *arg)
+{
+       INFO ("Flushing all data.");
+       plugin_flush_all (-1);
+       INFO ("Finished flushing all data.");
+       pthread_exit (NULL);
+       return NULL;
+}
+
+static void sig_int_handler (int signal)
 {
        loop++;
 }
 
-static void sigTermHandler (int signal)
+static void sig_term_handler (int signal)
 {
        loop++;
 }
 
+static void sig_usr1_handler (int signal)
+{
+       pthread_t      thread;
+       pthread_attr_t attr;
+
+       /* flushing the data might take a while,
+        * so it should be done asynchronously */
+       pthread_attr_init (&attr);
+       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+       pthread_create (&thread, &attr, do_flush, NULL);
+}
+
 static int init_hostname (void)
 {
        const char *str;
@@ -227,11 +250,13 @@ static void exit_usage (void)
                        "  General:\n"
                        "    -C <file>       Configuration file.\n"
                        "                    Default: "CONFIGFILE"\n"
+                       "    -t              Test config and exit.\n"
                        "    -P <file>       PID-file.\n"
                        "                    Default: "PIDFILE"\n"
 #if COLLECT_DAEMON
                        "    -f              Don't fork to the background.\n"
 #endif
+                       "    -h              Display help (this message)\n"
                        "\nBuiltin defaults:\n"
                        "  Config-File       "CONFIGFILE"\n"
                        "  PID-File          "PIDFILE"\n"
@@ -365,8 +390,9 @@ static int pidfile_remove (void)
 
 int main (int argc, char **argv)
 {
-       struct sigaction sigIntAction;
-       struct sigaction sigTermAction;
+       struct sigaction sig_int_action;
+       struct sigaction sig_term_action;
+       struct sigaction sig_usr1_action;
        char *configfile = CONFIGFILE;
        int test_config  = 0;
        const char *basedir;
@@ -511,13 +537,32 @@ int main (int argc, char **argv)
        /*
         * install signal handlers
         */
-       memset (&sigIntAction, '\0', sizeof (sigIntAction));
-       sigIntAction.sa_handler = sigIntHandler;
-       sigaction (SIGINT, &sigIntAction, NULL);
+       memset (&sig_int_action, '\0', sizeof (sig_int_action));
+       sig_int_action.sa_handler = sig_int_handler;
+       if (0 != sigaction (SIGINT, &sig_int_action, NULL)) {
+               char errbuf[1024];
+               ERROR ("Error: Failed to install a signal handler for signal INT: %s",
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
+               return (1);
+       }
+
+       memset (&sig_term_action, '\0', sizeof (sig_term_action));
+       sig_term_action.sa_handler = sig_term_handler;
+       if (0 != sigaction (SIGTERM, &sig_term_action, NULL)) {
+               char errbuf[1024];
+               ERROR ("Error: Failed to install a signal handler for signal TERM: %s",
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
+               return (1);
+       }
 
-       memset (&sigTermAction, '\0', sizeof (sigTermAction));
-       sigTermAction.sa_handler = sigTermHandler;
-       sigaction (SIGTERM, &sigTermAction, NULL);
+       memset (&sig_usr1_action, '\0', sizeof (sig_usr1_action));
+       sig_usr1_action.sa_handler = sig_usr1_handler;
+       if (0 != sigaction (SIGUSR1, &sig_usr1_action, NULL)) {
+               char errbuf[1024];
+               ERROR ("Error: Failed to install a signal handler for signal USR1: %s",
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
+               return (1);
+       }
 
        /*
         * run the actual loops
index b25e769..20e65df 100644 (file)
@@ -334,7 +334,7 @@ static void exec_child (program_list_t *pl) /* {{{ */
     glist[0] = gid;
     glist_len = 1;
 
-    if (gid != egid)
+    if ((gid != egid) && (egid != -1))
     {
       glist[1] = egid;
       glist_len = 2;
index 8c4bc4a..651de29 100644 (file)
@@ -314,6 +314,7 @@ static int hddtemp_init (void)
                                case SCSI_DISK5_MAJOR:
                                case SCSI_DISK6_MAJOR:
                                case SCSI_DISK7_MAJOR:
+#ifdef SCSI_DISK8_MAJOR
                                case SCSI_DISK8_MAJOR:
                                case SCSI_DISK9_MAJOR:
                                case SCSI_DISK10_MAJOR:
@@ -322,6 +323,7 @@ static int hddtemp_init (void)
                                case SCSI_DISK13_MAJOR:
                                case SCSI_DISK14_MAJOR:
                                case SCSI_DISK15_MAJOR:
+#endif /* SCSI_DISK8_MAJOR */
                                        /* SCSI disks minors are multiples of 16.
                                         * Keep only those. */
                                        if (minor % 16)
index 34cf018..80e5a53 100644 (file)
@@ -306,37 +306,83 @@ static int cache_check (const char *type, const value_list_t *vl)
 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
                const data_set_t *ds, const value_list_t *vl)
 {
-       part_values_t pv;
+       char *packet_ptr;
+       int packet_len;
+       int num_values;
+
+       part_header_t pkg_ph;
+       uint16_t      pkg_num_values;
+       uint8_t      *pkg_values_types;
+       value_t      *pkg_values;
+
+       int offset;
        int i;
 
-       i = 6 + (9 * vl->values_len);
-       if (*ret_buffer_len < i)
+       num_values = vl->values_len;
+       packet_len = sizeof (part_header_t) + sizeof (uint16_t)
+               + (num_values * sizeof (uint8_t))
+               + (num_values * sizeof (value_t));
+
+       if (*ret_buffer_len < packet_len)
                return (-1);
-       *ret_buffer_len -= i;
 
-       pv.head = (part_header_t *) *ret_buffer;
-       pv.num_values = (uint16_t *) (pv.head + 1);
-       pv.values_types = (uint8_t *) (pv.num_values + 1);
-       pv.values = (value_t *) (pv.values_types + vl->values_len);
-       *ret_buffer = (void *) (pv.values + vl->values_len);
+       pkg_values_types = (uint8_t *) malloc (num_values * sizeof (uint8_t));
+       if (pkg_values_types == NULL)
+       {
+               ERROR ("network plugin: write_part_values: malloc failed.");
+               return (-1);
+       }
 
-       pv.head->type = htons (TYPE_VALUES);
-       pv.head->length = htons (6 + (9 * vl->values_len));
-       *pv.num_values = htons ((uint16_t) vl->values_len);
-       
-       for (i = 0; i < vl->values_len; i++)
+       pkg_values = (value_t *) malloc (num_values * sizeof (value_t));
+       if (pkg_values == NULL)
+       {
+               free (pkg_values_types);
+               ERROR ("network plugin: write_part_values: malloc failed.");
+               return (-1);
+       }
+
+       pkg_ph.type = htons (TYPE_VALUES);
+       pkg_ph.length = htons (packet_len);
+
+       pkg_num_values = htons ((uint16_t) vl->values_len);
+
+       for (i = 0; i < num_values; i++)
        {
                if (ds->ds[i].type == DS_TYPE_COUNTER)
                {
-                       pv.values_types[i] = DS_TYPE_COUNTER;
-                       pv.values[i].counter = htonll (vl->values[i].counter);
+                       pkg_values_types[i] = DS_TYPE_COUNTER;
+                       pkg_values[i].counter = htonll (vl->values[i].counter);
                }
                else
                {
-                       pv.values_types[i] = DS_TYPE_GAUGE;
-                       pv.values[i].gauge = vl->values[i].gauge;
+                       pkg_values_types[i] = DS_TYPE_GAUGE;
+                       pkg_values[i].gauge = vl->values[i].gauge;
                }
-       } /* for (values) */
+       }
+
+       /*
+        * Use `memcpy' to write everything to the buffer, because the pointer
+        * may be unaligned and some architectures, such as SPARC, can't handle
+        * that.
+        */
+       packet_ptr = *ret_buffer;
+       offset = 0;
+       memcpy (packet_ptr + offset, &pkg_ph, sizeof (pkg_ph));
+       offset += sizeof (pkg_ph);
+       memcpy (packet_ptr + offset, &pkg_num_values, sizeof (pkg_num_values));
+       offset += sizeof (pkg_num_values);
+       memcpy (packet_ptr + offset, pkg_values_types, num_values * sizeof (uint8_t));
+       offset += num_values * sizeof (uint8_t);
+       memcpy (packet_ptr + offset, pkg_values, num_values * sizeof (value_t));
+       offset += num_values * sizeof (value_t);
+
+       assert (offset == packet_len);
+
+       *ret_buffer = packet_ptr + packet_len;
+       *ret_buffer_len -= packet_len;
+
+       free (pkg_values_types);
+       free (pkg_values);
 
        return (0);
 } /* int write_part_values */
@@ -344,20 +390,34 @@ static int write_part_values (char **ret_buffer, int *ret_buffer_len,
 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
                int type, uint64_t value)
 {
-       part_number_t pn;
+       char *packet_ptr;
+       int packet_len;
+
+       part_header_t pkg_head;
+       uint64_t pkg_value;
+       
+       int offset;
 
-       if (*ret_buffer_len < 12)
+       packet_len = sizeof (pkg_head) + sizeof (pkg_value);
+
+       if (*ret_buffer_len < packet_len)
                return (-1);
 
-       pn.head = (part_header_t *) *ret_buffer;
-       pn.value = (uint64_t *) (pn.head + 1);
+       pkg_head.type = htons (type);
+       pkg_head.length = htons (packet_len);
+       pkg_value = htonll (value);
 
-       pn.head->type = htons (type);
-       pn.head->length = htons (12);
-       *pn.value = htonll (value);
+       packet_ptr = *ret_buffer;
+       offset = 0;
+       memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
+       offset += sizeof (pkg_head);
+       memcpy (packet_ptr + offset, &pkg_value, sizeof (pkg_value));
+       offset += sizeof (pkg_value);
 
-       *ret_buffer = (char *) (pn.value + 1);
-       *ret_buffer_len -= 12;
+       assert (offset == packet_len);
+
+       *ret_buffer = packet_ptr + packet_len;
+       *ret_buffer_len -= packet_len;
 
        return (0);
 } /* int write_part_number */
@@ -365,23 +425,33 @@ static int write_part_number (char **ret_buffer, int *ret_buffer_len,
 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
                int type, const char *str, int str_len)
 {
-       part_string_t ps;
-       int len;
+       char *packet_ptr;
+       int packet_len;
+
+       part_header_t pkg_head;
 
-       len = 4 + str_len + 1;
-       if (*ret_buffer_len < len)
+       int offset;
+
+       packet_len = sizeof (pkg_head) + str_len + 1;
+       if (*ret_buffer_len < packet_len)
                return (-1);
-       *ret_buffer_len -= len;
 
-       ps.head = (part_header_t *) *ret_buffer;
-       ps.value = (char *) (ps.head + 1);
+       pkg_head.type = htons (type);
+       pkg_head.length = htons (packet_len);
+
+       packet_ptr = *ret_buffer;
+       offset = 0;
+       memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
+       offset += sizeof (pkg_head);
+       memcpy (packet_ptr + offset, str, str_len);
+       offset += str_len;
+       memset (packet_ptr + offset, '\0', 1);
+       offset += 1;
 
-       ps.head->type = htons ((uint16_t) type);
-       ps.head->length = htons ((uint16_t) str_len + 5);
-       if (str_len > 0)
-               memcpy (ps.value, str, str_len);
-       ps.value[str_len] = '\0';
-       *ret_buffer = (void *) (ps.value + (str_len + 1));
+       assert (offset == packet_len);
+
+       *ret_buffer = packet_ptr + packet_len;
+       *ret_buffer_len -= packet_len;
 
        return (0);
 } /* int write_part_string */
@@ -1602,9 +1672,25 @@ static int network_init (void)
        return (0);
 } /* int network_init */
 
+static int network_flush (int timeout)
+{
+       pthread_mutex_lock (&send_buffer_lock);
+
+       if (((time (NULL) - cache_flush_last) >= timeout)
+                       && (send_buffer_fill > 0))
+       {
+               flush_buffer ();
+       }
+
+       pthread_mutex_unlock (&send_buffer_lock);
+
+       return (0);
+} /* int network_flush */
+
 void module_register (void)
 {
        plugin_register_config ("network", network_config,
                        config_keys, config_keys_num);
        plugin_register_init   ("network", network_init);
+       plugin_register_flush   ("network", network_flush);
 } /* void module_register */
index 7558a50..6d3326f 100644 (file)
@@ -1514,6 +1514,11 @@ static int perl_config_includedir (pTHX_ oconfig_item_t *ci)
                return 1;
        }
 
+       if (NULL == aTHX) {
+               log_warn ("EnableDebugger has no effects if used after LoadPlugin.");
+               return 1;
+       }
+
        value = ci->values[0].value.string;
 
        if (NULL == aTHX) {
index 05f660b..2f7c064 100644 (file)
@@ -162,7 +162,7 @@ static int ping_config (const char *key, const char *value)
        else if (strcasecmp (key, "ttl") == 0)
        {
                int ttl = atoi (value);
-               if (ping_setopt (pingobj, PING_DEF_TIMEOUT, (void *) &ttl))
+               if (ping_setopt (pingobj, PING_OPT_TTL, (void *) &ttl))
                {
                        WARNING ("ping: liboping did not accept the TTL value %i", ttl);
                        return (1);
index 1dd6daf..eaf1a41 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005,2006  Florian octo Forster
+ * Copyright (C) 2005-2008  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -17,6 +17,7 @@
  *
  * Authors:
  *   Florian octo Forster <octo at verplant.org>
+ *   Sebastian Harl <sh at tokkee.org>
  **/
 
 #include "collectd.h"
@@ -53,6 +54,7 @@ typedef struct read_func_s read_func_t;
 static llist_t *list_init;
 static llist_t *list_read;
 static llist_t *list_write;
+static llist_t *list_flush;
 static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
@@ -433,6 +435,11 @@ int plugin_register_write (const char *name,
        return (register_callback (&list_write, name, (void *) callback));
 } /* int plugin_register_write */
 
+int plugin_register_flush (const char *name, int (*callback) (const int))
+{
+       return (register_callback (&list_flush, name, (void *) callback));
+} /* int plugin_register_flush */
+
 int plugin_register_shutdown (char *name,
                int (*callback) (void))
 {
@@ -527,6 +534,11 @@ int plugin_unregister_write (const char *name)
        return (plugin_unregister (list_write, name));
 }
 
+int plugin_unregister_flush (const char *name)
+{
+       return (plugin_unregister (list_flush, name));
+}
+
 int plugin_unregister_shutdown (const char *name)
 {
        return (plugin_unregister (list_shutdown, name));
@@ -639,6 +651,43 @@ void plugin_read_all (void)
        pthread_mutex_unlock (&read_lock);
 } /* void plugin_read_all */
 
+int plugin_flush_one (int timeout, const char *name)
+{
+       int (*callback) (int);
+       llentry_t *le;
+       int status;
+
+       if (list_flush == NULL)
+               return (-1);
+
+       le = llist_search (list_flush, name);
+       if (le == NULL)
+               return (-1);
+       callback = (int (*) (int)) le->value;
+
+       status = (*callback) (timeout);
+
+       return (status);
+} /* int plugin_flush_ont */
+
+void plugin_flush_all (int timeout)
+{
+       int (*callback) (int);
+       llentry_t *le;
+
+       if (list_flush == NULL)
+               return;
+
+       le = llist_head (list_flush);
+       while (le != NULL)
+       {
+               callback = (int (*) (int)) le->value;
+               le = le->next;
+
+               (*callback) (timeout);
+       }
+} /* void plugin_flush_all */
+
 void plugin_shutdown_all (void)
 {
        int (*callback) (void);
index 25c745c..7b59930 100644 (file)
@@ -2,7 +2,7 @@
 #define PLUGIN_H
 /**
  * collectd - src/plugin.h
- * Copyright (C) 2005-2007  Florian octo Forster
+ * Copyright (C) 2005-2008  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -19,6 +19,7 @@
  *
  * Authors:
  *   Florian octo Forster <octo at verplant.org>
+ *   Sebastian Harl <sh at tokkee.org>
  **/
 
 #include "collectd.h"
@@ -149,8 +150,11 @@ int plugin_load (const char *name);
 
 void plugin_init_all (void);
 void plugin_read_all (void);
+void plugin_flush_all (int timeout);
 void plugin_shutdown_all (void);
 
+int plugin_flush_one (int timeout, const char *name);
+
 /*
  * The `plugin_register_*' functions are used to make `config', `init',
  * `read', `write' and `shutdown' functions known to the plugin
@@ -167,6 +171,8 @@ int plugin_register_read (const char *name,
                int (*callback) (void));
 int plugin_register_write (const char *name,
                int (*callback) (const data_set_t *ds, const value_list_t *vl));
+int plugin_register_flush (const char *name,
+               int (*callback) (const int));
 int plugin_register_shutdown (char *name,
                int (*callback) (void));
 int plugin_register_data_set (const data_set_t *ds);
@@ -180,6 +186,7 @@ int plugin_unregister_complex_config (const char *name);
 int plugin_unregister_init (const char *name);
 int plugin_unregister_read (const char *name);
 int plugin_unregister_write (const char *name);
+int plugin_unregister_flush (const char *name);
 int plugin_unregister_shutdown (const char *name);
 int plugin_unregister_data_set (const char *name);
 int plugin_unregister_log (const char *name);
index 024b04e..ab24524 100644 (file)
@@ -947,6 +947,20 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl)
        return (status);
 } /* int rrd_write */
 
+static int rrd_flush (const int timeout)
+{
+       pthread_mutex_lock (&cache_lock);
+
+       if (cache == NULL) {
+               pthread_mutex_unlock (&cache_lock);
+               return (0);
+       }
+
+       rrd_cache_flush (timeout);
+       pthread_mutex_unlock (&cache_lock);
+       return (0);
+} /* int rrd_flush */
+
 static int rrd_config (const char *key, const char *value)
 {
        if (strcasecmp ("CacheTimeout", key) == 0)
@@ -1099,12 +1113,7 @@ static int rrd_init (void)
        if (stepsize < 0)
                stepsize = 0;
        if (heartbeat <= 0)
-       {
-               if (stepsize > 0)
-                       heartbeat = 2 * stepsize;
-               else
-                       heartbeat = 0;
-       }
+               heartbeat = 2 * stepsize;
 
        if ((heartbeat > 0) && (heartbeat < interval_g))
                WARNING ("rrdtool plugin: Your `heartbeat' is "
@@ -1157,5 +1166,6 @@ void module_register (void)
                        config_keys, config_keys_num);
        plugin_register_init ("rrdtool", rrd_init);
        plugin_register_write ("rrdtool", rrd_write);
+       plugin_register_flush ("rrdtool", rrd_flush);
        plugin_register_shutdown ("rrdtool", rrd_shutdown);
 }
index 00cad0e..fdf7ec1 100644 (file)
 #elif HAVE_SYSCTLBYNAME
 # include <sys/socketvar.h>
 # include <sys/sysctl.h>
+
+/* Some includes needed for compiling on FreeBSD */
+#include <sys/time.h>
+#if HAVE_SYS_TYPES_H
+# include <sys/types.h>
+#endif
+#if HAVE_SYS_SOCKET_H
+# include <sys/socket.h>
+#endif
+#if HAVE_NET_IF_H
+# include <net/if.h>
+#endif
+
 # include <net/route.h>
 # include <netinet/in.h>
 # include <netinet/in_systm.h>
index 63c3ae9..a7618c0 100644 (file)
@@ -24,6 +24,8 @@
 #include "plugin.h"
 #include "configfile.h"
 
+#include "utils_cmd_flush.h"
+#include "utils_cmd_getval.h"
 #include "utils_cmd_putval.h"
 #include "utils_cmd_putnotif.h"
 
@@ -420,80 +422,6 @@ static int us_open_socket (void)
        return (0);
 } /* int us_open_socket */
 
-static int us_handle_getval (FILE *fh, char **fields, int fields_num)
-{
-       char *hostname;
-       char *plugin;
-       char *plugin_instance;
-       char *type;
-       char *type_instance;
-       char  name[4*DATA_MAX_NAME_LEN];
-       value_cache_t *vc;
-       int   status;
-       int   i;
-
-       if (fields_num != 2)
-       {
-               DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num);
-               fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n",
-                               fields_num);
-               fflush (fh);
-               return (-1);
-       }
-       DEBUG ("unixsock plugin: Got query for `%s'", fields[1]);
-
-       status = parse_identifier (fields[1], &hostname,
-                       &plugin, &plugin_instance,
-                       &type, &type_instance);
-       if (status != 0)
-       {
-               DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]);
-               fprintf (fh, "-1 Cannot parse identifier.\n");
-               fflush (fh);
-               return (-1);
-       }
-
-       status = format_name (name, sizeof (name),
-                       hostname, plugin, plugin_instance, type, type_instance);
-       if (status != 0)
-       {
-               fprintf (fh, "-1 format_name failed.\n");
-               return (-1);
-       }
-
-       pthread_mutex_lock (&cache_lock);
-
-       DEBUG ("vc = cache_search (%s)", name);
-       vc = cache_search (name);
-
-       if (vc == NULL)
-       {
-               DEBUG ("Did not find cache entry.");
-               fprintf (fh, "-1 No such value");
-       }
-       else
-       {
-               DEBUG ("Found cache entry.");
-               fprintf (fh, "%i", vc->values_num);
-               for (i = 0; i < vc->values_num; i++)
-               {
-                       fprintf (fh, " %s=", vc->ds->ds[i].name);
-                       if (isnan (vc->gauge[i]))
-                               fprintf (fh, "NaN");
-                       else
-                               fprintf (fh, "%12e", vc->gauge[i]);
-               }
-       }
-
-       /* Free the mutex as soon as possible and definitely before flushing */
-       pthread_mutex_unlock (&cache_lock);
-
-       fprintf (fh, "\n");
-       fflush (fh);
-
-       return (0);
-} /* int us_handle_getval */
-
 static int us_handle_listval (FILE *fh, char **fields, int fields_num)
 {
        char buffer[1024];
@@ -593,7 +521,7 @@ static void *us_handle_client (void *arg)
 
                if (strcasecmp (fields[0], "getval") == 0)
                {
-                       us_handle_getval (fh, fields, fields_num);
+                       handle_getval (fh, fields, fields_num);
                }
                else if (strcasecmp (fields[0], "putval") == 0)
                {
@@ -607,6 +535,10 @@ static void *us_handle_client (void *arg)
                {
                        handle_putnotif (fh, fields, fields_num);
                }
+               else if (strcasecmp (fields[0], "flush") == 0)
+               {
+                       handle_flush (fh, fields, fields_num);
+               }
                else
                {
                        fprintf (fh, "-1 Unknown command: %s\n", fields[0]);
index 0d6961e..ad8fb19 100644 (file)
@@ -56,6 +56,43 @@ static int cache_compare (const cache_entry_t *a, const cache_entry_t *b)
   return (strcmp (a->name, b->name));
 } /* int cache_compare */
 
+static cache_entry_t *cache_alloc (int values_num)
+{
+  cache_entry_t *ce;
+
+  ce = (cache_entry_t *) malloc (sizeof (cache_entry_t));
+  if (ce == NULL)
+  {
+    ERROR ("utils_cache: cache_alloc: malloc failed.");
+    return (NULL);
+  }
+  memset (ce, '\0', sizeof (cache_entry_t));
+  ce->values_num = values_num;
+
+  ce->values_gauge = (gauge_t *) calloc (values_num, sizeof (gauge_t));
+  ce->values_counter = (counter_t *) calloc (values_num, sizeof (counter_t));
+  if ((ce->values_gauge == NULL) || (ce->values_counter == NULL))
+  {
+    sfree (ce->values_gauge);
+    sfree (ce->values_counter);
+    sfree (ce);
+    ERROR ("utils_cache: cache_alloc: calloc failed.");
+    return (NULL);
+  }
+
+  return (ce);
+} /* cache_entry_t *cache_alloc */
+
+static void cache_free (cache_entry_t *ce)
+{
+  if (ce == NULL)
+    return;
+
+  sfree (ce->values_gauge);
+  sfree (ce->values_counter);
+  sfree (ce);
+} /* void cache_free */
+
 static int uc_send_notification (const char *name)
 {
   cache_entry_t *ce = NULL;
@@ -206,7 +243,7 @@ int uc_check_timeout (void)
        ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
       }
       sfree (keys[i]);
-      sfree (ce);
+      cache_free (ce);
     }
     else /* (status > 0); ``service'' is interesting */
     {
@@ -312,8 +349,6 @@ int uc_update (const data_set_t *ds, const value_list_t *vl)
   else /* key is not found */
   {
     int i;
-    size_t ce_size = sizeof (cache_entry_t)
-      + ds->ds_num * (sizeof (counter_t) + sizeof (gauge_t));
     char *key;
     
     key = strdup (name);
@@ -324,22 +359,15 @@ int uc_update (const data_set_t *ds, const value_list_t *vl)
       return (-1);
     }
 
-    ce = (cache_entry_t *) malloc (ce_size);
+    ce = cache_alloc (ds->ds_num);
     if (ce == NULL)
     {
       pthread_mutex_unlock (&cache_lock);
-      ERROR ("uc_insert: malloc (%u) failed.", (unsigned int) ce_size);
+      ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
       return (-1);
     }
 
-    memset (ce, '\0', ce_size);
-
-    strncpy (ce->name, name, sizeof (ce->name));
-    ce->name[sizeof (ce->name) - 1] = '\0';
-
-    ce->values_num = ds->ds_num;
-    ce->values_gauge = (gauge_t *) (ce + 1);
-    ce->values_counter = (counter_t *) (ce->values_gauge + ce->values_num);
+    sstrncpy (ce->name, name, sizeof (ce->name));
 
     for (i = 0; i < ds->ds_num; i++)
     {
@@ -404,38 +432,76 @@ int uc_update (const data_set_t *ds, const value_list_t *vl)
   return (0);
 } /* int uc_insert */
 
-gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
+int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num)
 {
-  char name[6 * DATA_MAX_NAME_LEN];
   gauge_t *ret = NULL;
+  size_t ret_num = 0;
   cache_entry_t *ce = NULL;
-
-  if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
-  {
-    ERROR ("uc_insert: FORMAT_VL failed.");
-    return (NULL);
-  }
+  int status = 0;
 
   pthread_mutex_lock (&cache_lock);
 
   if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
   {
     assert (ce != NULL);
-    assert (ce->values_num == ds->ds_num);
 
-    ret = (gauge_t *) malloc (ce->values_num * sizeof (gauge_t));
+    ret_num = ce->values_num;
+    ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
     if (ret == NULL)
     {
-      ERROR ("uc_get_rate: malloc failed.");
+      ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
+      status = -1;
     }
     else
     {
-      memcpy (ret, ce->values_gauge, ce->values_num * sizeof (gauge_t));
+      memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
     }
   }
+  else
+  {
+    DEBUG ("utils_cache: uc_get_rate_by_name: No such value: %s", name);
+    status = -1;
+  }
 
   pthread_mutex_unlock (&cache_lock);
 
+  if (status == 0)
+  {
+    *ret_values = ret;
+    *ret_values_num = ret_num;
+  }
+
+  return (status);
+} /* gauge_t *uc_get_rate_by_name */
+
+gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
+{
+  char name[6 * DATA_MAX_NAME_LEN];
+  gauge_t *ret = NULL;
+  size_t ret_num = 0;
+  int status;
+
+  if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
+  {
+    ERROR ("uc_insert: FORMAT_VL failed.");
+    return (NULL);
+  }
+
+  status = uc_get_rate_by_name (name, &ret, &ret_num);
+  if (status != 0)
+    return (NULL);
+
+  /* This is important - the caller has no other way of knowing how many
+   * values are returned. */
+  if (ret_num != ds->ds_num)
+  {
+    ERROR ("utils_cache: uc_get_rate: ds[%s] has %i values, "
+       "but uc_get_rate_by_name returned %i.",
+       ds->type, ds->ds_num, ret_num);
+    sfree (ret);
+    return (NULL);
+  }
+
   return (ret);
 } /* gauge_t *uc_get_rate */
 
index d6a56ab..ed6830b 100644 (file)
@@ -31,6 +31,7 @@
 int uc_init (void);
 int uc_check_timeout (void);
 int uc_update (const data_set_t *ds, const value_list_t *vl);
+int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num);
 gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl);
 
 int uc_get_state (const data_set_t *ds, const value_list_t *vl);
diff --git a/src/utils_cmd_flush.c b/src/utils_cmd_flush.c
new file mode 100644 (file)
index 0000000..e7737a0
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * collectd - src/utils_cmd_flush.c
+ * Copyright (C) 2008  Sebastian Harl
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Sebastian "tokkee" Harl <sh at tokkee.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+
+int handle_flush (FILE *fh, char **fields, int fields_num)
+{
+       int timeout = -1;
+
+       if ((fields_num != 1) && (fields_num != 2))
+       {
+               DEBUG ("unixsock plugin: us_handle_flush: "
+                               "Wrong number of fields: %i", fields_num);
+               fprintf (fh, "-1 Wrong number of fields: Got %i, expected 1 or 2.\n",
+                               fields_num);
+               fflush (fh);
+               return (-1);
+       }
+
+       if (fields_num == 2)
+               timeout = atoi (fields[1]);
+
+       INFO ("unixsock plugin: flushing all data");
+       plugin_flush_all (timeout);
+       INFO ("unixsock plugin: finished flushing all data");
+
+       fprintf (fh, "0 Done\n");
+       fflush (fh);
+       return (0);
+} /* int handle_flush */
+
+/* vim: set sw=4 ts=4 tw=78 noexpandtab : */
+
diff --git a/src/utils_cmd_flush.h b/src/utils_cmd_flush.h
new file mode 100644 (file)
index 0000000..334f086
--- /dev/null
@@ -0,0 +1,30 @@
+/**
+ * collectd - src/utils_cmd_flush.h
+ * Copyright (C) 2008  Sebastian Harl
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Sebastian "tokkee" Harl <sh at tokkee.org>
+ **/
+
+#ifndef UTILS_CMD_FLUSH_H
+#define UTILS_CMD_FLUSH_H 1
+
+int handle_flush (FILE *fh, char **fields, int fields_num);
+
+#endif /* UTILS_CMD_FLUSH_H */
+
+/* vim: set sw=4 ts=4 tw=78 noexpandtab : */
+
diff --git a/src/utils_cmd_getval.c b/src/utils_cmd_getval.c
new file mode 100644 (file)
index 0000000..a4edf4f
--- /dev/null
@@ -0,0 +1,119 @@
+/**
+ * collectd - src/utils_cms_getval.c
+ * Copyright (C) 2008  Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+
+#include "utils_cache.h"
+
+int handle_getval (FILE *fh, char **fields, int fields_num)
+{
+  char *hostname;
+  char *plugin;
+  char *plugin_instance;
+  char *type;
+  char *type_instance;
+  gauge_t *values;
+  size_t values_num;
+
+  const data_set_t *ds;
+
+  int   status;
+  int   i;
+
+  if (fields_num != 2)
+  {
+    DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num);
+    fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n",
+       fields_num);
+    fflush (fh);
+    return (-1);
+  }
+  DEBUG ("unixsock plugin: Got query for `%s'", fields[1]);
+
+  if (strlen (fields[1]) < strlen ("h/p/t"))
+  {
+    fprintf (fh, "-1 Invalied identifier, %s", fields[1]);
+    fflush (fh);
+    return (-1);
+  }
+
+  status = parse_identifier (fields[1], &hostname,
+      &plugin, &plugin_instance,
+      &type, &type_instance);
+  if (status != 0)
+  {
+    DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]);
+    fprintf (fh, "-1 Cannot parse identifier.\n");
+    fflush (fh);
+    return (-1);
+  }
+
+  ds = plugin_get_ds (type);
+  if (ds == NULL)
+  {
+    DEBUG ("unixsock plugin: plugin_get_ds (%s) == NULL;", type);
+    fprintf (fh, "-1 Type `%s' is unknown.\n", type);
+    fflush (fh);
+    return (-1);
+  }
+
+  values = NULL;
+  values_num = 0;
+  status = uc_get_rate_by_name (fields[1], &values, &values_num);
+  if (status != 0)
+  {
+    fprintf (fh, "-1 No such value");
+    fflush (fh);
+    return (-1);
+  }
+
+  if (ds->ds_num != values_num)
+  {
+    ERROR ("ds[%s]->ds_num = %i, "
+       "but uc_get_rate_by_name returned %i values.",
+       ds->type, ds->ds_num, values_num);
+    fprintf (fh, "-1 Error reading value from cache.\n");
+    fflush (fh);
+    sfree (values);
+    return (-1);
+  }
+
+  fprintf (fh, "%u", (unsigned int) values_num);
+  for (i = 0; i < values_num; i++)
+  {
+    fprintf (fh, " %s=", ds->ds[i].name);
+    if (isnan (values[i]))
+      fprintf (fh, "NaN");
+    else
+      fprintf (fh, "%12e", values[i]);
+  }
+
+  fprintf (fh, "\n");
+  fflush (fh);
+
+  sfree (values);
+
+  return (0);
+} /* int handle_getval */
+
+/* vim: set sw=2 sts=2 ts=8 : */
diff --git a/src/utils_cmd_getval.h b/src/utils_cmd_getval.h
new file mode 100644 (file)
index 0000000..d7bd115
--- /dev/null
@@ -0,0 +1,29 @@
+/**
+ * collectd - src/utils_cms_getval.h
+ * Copyright (C) 2008  Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#ifndef UTILS_CMD_GETVAL_H
+#define UTILS_CMD_GETVAL_H 1
+
+int handle_getval (FILE *fh, char **fields, int fields_num);
+
+#endif /* UTILS_CMD_GETVAL_H */
+
+/* vim: set sw=2 sts=2 ts=8 : */
index a953172..08b3bb3 100644 (file)
@@ -22,8 +22,6 @@
 #ifndef UTILS_CMD_PUTNOTIF_H
 #define UTILS_CMD_PUTNOTIF_H 1
 
-#include "plugin.h"
-
 int handle_putnotif (FILE *fh, char **fields, int fields_num);
 
 /* vim: set shiftwidth=2 softtabstop=2 tabstop=8 : */
index 609efcb..2ae4532 100644 (file)
@@ -22,8 +22,6 @@
 #ifndef UTILS_CMD_PUTVAL_H
 #define UTILS_CMD_PUTVAL_H 1
 
-#include "plugin.h"
-
 int handle_putval (FILE *fh, char **fields, int fields_num);
 
 #endif /* UTILS_CMD_PUTVAL_H */
index dccb40a..9e75e4e 100644 (file)
@@ -35,14 +35,41 @@ struct cu_match_s
   regex_t regex;
   int flags;
 
-  int (*callback) (const char *str, void *user_data);
+  int (*callback) (const char *str, char * const *matches, size_t matches_num,
+      void *user_data);
   void *user_data;
 };
 
 /*
  * Private functions
  */
-static int default_callback (const char *str, void *user_data)
+static char *match_substr (const char *str, int begin, int end)
+{
+  char *ret;
+  size_t ret_len;
+
+  if ((begin < 0) || (end < 0) || (begin >= end))
+    return (NULL);
+  if (end > (strlen (str) + 1))
+  {
+    ERROR ("utils_match: match_substr: `end' points after end of string.");
+    return (NULL);
+  }
+
+  ret_len = end - begin;
+  ret = (char *) malloc (sizeof (char) * (ret_len + 1));
+  if (ret == NULL)
+  {
+    ERROR ("utils_match: match_substr: malloc failed.");
+    return (NULL);
+  }
+
+  sstrncpy (ret, str + begin, ret_len + 1);
+  return (ret);
+} /* char *match_substr */
+
+static int default_callback (const char *str,
+    char * const *matches, size_t matches_num, void *user_data)
 {
   cu_match_value_t *data = (cu_match_value_t *) user_data;
 
@@ -51,8 +78,11 @@ static int default_callback (const char *str, void *user_data)
     gauge_t value;
     char *endptr = NULL;
 
-    value = strtod (str, &endptr);
-    if (str == endptr)
+    if (matches_num < 2)
+      return (-1);
+
+    value = strtod (matches[1], &endptr);
+    if (matches[1] == endptr)
       return (-1);
 
     if ((data->values_num == 0)
@@ -96,8 +126,11 @@ static int default_callback (const char *str, void *user_data)
       return (0);
     }
 
-    value = strtoll (str, &endptr, 0);
-    if (str == endptr)
+    if (matches_num < 2)
+      return (-1);
+
+    value = strtoll (matches[1], &endptr, 0);
+    if (matches[1] == endptr)
       return (-1);
 
     if (data->ds_type & UTILS_MATCH_CF_COUNTER_SET)
@@ -125,7 +158,8 @@ static int default_callback (const char *str, void *user_data)
  * Public functions
  */
 cu_match_t *match_create_callback (const char *regex,
-               int (*callback) (const char *str, void *user_data),
+               int (*callback) (const char *str,
+                 char * const *matches, size_t matches_num, void *user_data),
                void *user_data)
 {
   cu_match_t *obj;
@@ -191,47 +225,55 @@ void match_destroy (cu_match_t *obj)
 int match_apply (cu_match_t *obj, const char *str)
 {
   int status;
-  regmatch_t re_match[2];
-  char *sub_match;
-  size_t sub_match_len;
+  regmatch_t re_match[32];
+  char *matches[32];
+  size_t matches_num;
+  int i;
 
   if ((obj == NULL) || (str == NULL))
     return (-1);
 
-  re_match[0].rm_so = -1;
-  re_match[0].rm_eo = -1;
-  re_match[1].rm_so = -1;
-  re_match[1].rm_eo = -1;
-  status = regexec (&obj->regex, str, /* nmatch = */ 2, re_match,
+  status = regexec (&obj->regex, str,
+      STATIC_ARRAY_SIZE (re_match), re_match,
       /* eflags = */ 0);
 
   /* Regex did not match */
   if (status != 0)
     return (0);
 
-  /* re_match[0] is the location of the entire match.
-   * re_match[1] is the location of the sub-match. */
-  if (re_match[1].rm_so < 0)
+  memset (matches, '\0', sizeof (matches));
+  for (matches_num = 0; matches_num < STATIC_ARRAY_SIZE (matches); matches_num++)
   {
-    status = obj->callback (str, obj->user_data);
-    return (status);
+    if ((re_match[matches_num].rm_so < 0)
+       || (re_match[matches_num].rm_eo < 0))
+      break;
+
+    matches[matches_num] = match_substr (str,
+       re_match[matches_num].rm_so, re_match[matches_num].rm_eo);
+    if (matches[matches_num] == NULL)
+    {
+      status = -1;
+      break;
+    }
   }
 
-  assert (re_match[1].rm_so < re_match[1].rm_eo);
-  sub_match_len = (size_t) (re_match[1].rm_eo - re_match[1].rm_so);
-  sub_match = (char *) malloc (sizeof (char) * (sub_match_len + 1));
-  if (sub_match == NULL)
+  if (status != 0)
   {
-    ERROR ("malloc failed.");
-    return (-1);
+    ERROR ("utils_match: match_apply: match_substr failed.");
+  }
+  else
+  {
+    status = obj->callback (str, matches, matches_num, obj->user_data);
+    if (status != 0)
+    {
+      ERROR ("utils_match: match_apply: callback failed.");
+    }
   }
-  sstrncpy (sub_match, str + re_match[1].rm_so, sub_match_len + 1);
-
-  DEBUG ("utils_match: match_apply: Dispatching substring \"%s\" to "
-      "callback.", sub_match);
-  status = obj->callback (sub_match, obj->user_data);
 
-  sfree (sub_match);
+  for (i = 0; i < matches_num; i++)
+  {
+    sfree (matches[i]);
+  }
 
   return (status);
 } /* int match_apply */
index da9f1bc..a39c869 100644 (file)
@@ -74,7 +74,8 @@ typedef struct cu_match_value_s cu_match_value_t;
  *  callback.
  */
 cu_match_t *match_create_callback (const char *regex,
-               int (*callback) (const char *str, void *user_data),
+               int (*callback) (const char *str,
+                 char * const *matches, size_t matches_num, void *user_data),
                void *user_data);
 
 /*