From fc257d86996117d80b35909c16fe628a209faf12 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Tue, 28 Nov 2006 13:39:51 +0100 Subject: [PATCH] New plugin "email" to collectd ham, spam, ... statistics This plugin collects email count and size for each type (e.g. ham, spam, virus, ...) of emails, spam score values and the count of successful spam checks (e.g. BAYES_99, SUBJECT_DRUG_GAP_C, ...). These information are provided by external programs which communicate with the plugin thru a UNIX socket and a simple line-based protocol: /* e-mail type (e.g. ham, spam, virus, ...) and size */ e:: /* spam score */ s: /* successful spam checks */ c:[,,...] At most MAX_CONNS (currently set to 5) clients can connect to the plugin simultaneously. Each connection is handled by a separate thread. Any input line is limited to 256 characters (including the newline character) which ought to be enough for anybody[tm] by definition. "c"-lines have to be split up if they grow longer. Signed-off-by: Sebastian Harl --- configure.in | 11 + src/Makefile.am | 8 + src/email.c | 741 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 760 insertions(+) create mode 100644 src/email.c diff --git a/configure.in b/configure.in index e76d8111..ca10e217 100644 --- a/configure.in +++ b/configure.in @@ -269,6 +269,15 @@ AC_CHECK_HEADERS(sys/vfstab.h) # For the swap plugin, FreeBSD AC_CHECK_HEADERS(kvm.h) +# For the email plugin +AC_CHECK_HEADERS(linux/un.h, [], [], +[ +#if HAVE_SYS_SOCKET_H +# include +#endif +]) +AC_CHECK_HEADERS(sys/un.h) + # For debugging interface (variable number of arguments) AC_CHECK_HEADERS(stdarg.h) @@ -953,6 +962,7 @@ AC_COLLECTD([cpu], [disable], [module], [cpu usage statistics]) AC_COLLECTD([cpufreq], [disable], [module], [system cpu frequency statistics]) AC_COLLECTD([disk], [disable], [module], [disk/partition statistics]) AC_COLLECTD([df], [disable], [module], [df statistics]) +AC_COLLECTD([email], [disable], [module], [email statistics]) AC_COLLECTD([quota], [enable], [module], [quota statistics (experimental)]) AC_COLLECTD([hddtemp], [disable], [module], [hdd temperature statistics]) AC_COLLECTD([load], [disable], [module], [system load statistics]) @@ -1003,6 +1013,7 @@ Configuration: cpufreq . . . . . . $enable_cpufreq df . . . . . . . . $enable_df disk . . . . . . . $enable_disk + email . . . . . . . $enable_email hddtemp . . . . . . $enable_hddtemp load . . . . . . . $enable_load memory . . . . . . $enable_memory diff --git a/src/Makefile.am b/src/Makefile.am index be062863..6f0ca0a1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -156,6 +156,14 @@ collectd_LDADD += "-dlopen" disk.la collectd_DEPENDENCIES += disk.la endif +if BUILD_MODULE_EMAIL +pkglib_LTLIBRARIES += email.la +email_la_SOURCES = email.c +email_la_LDFLAGS = -module -avoid-version -lpthread +collectd_LDADD += "-dlopen" email.la +collectd_DEPENDENCIES += email.la +endif + #if BUILD_MODULE_QUOTA #pkglib_LTLIBRARIES += quota.la #quota_la_SOURCES = quota_plugin.c quota_plugin.h diff --git a/src/email.c b/src/email.c new file mode 100644 index 00000000..b1eaf404 --- /dev/null +++ b/src/email.c @@ -0,0 +1,741 @@ +/** + * collectd - src/email.c + * Copyright (C) 2006 Sebastian Harl + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Sebastian Harl + **/ + +/* + * This plugin communicates with a spam filter, a virus scanner or similar + * software using a UNIX socket and a very simple protocol: + * + * e-mail type (e.g. ham, spam, virus, ...) and size + * e:: + * + * spam score + * s: + * + * successful spam checks + * c:[,,...] + */ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" + +#include + +#if HAVE_SYS_SELECT_H +# include +#endif /* HAVE_SYS_SELECT_H */ + +#if HAVE_SYS_SOCKET_H +# include +#endif /* HAVE_SYS_SOCKET_H */ + +/* *sigh* glibc does not define UNIX_PATH_MAX in sys/un.h ... */ +#if HAVE_LINUX_UN_H +# include +#elif HAVE_SYS_UN_H +# include +#endif /* HAVE_LINUX_UN_H | HAVE_SYS_UN_H */ + +#define MODULE_NAME "email" + +/* 256 bytes ought to be enough for anybody ;-) */ +#define BUFSIZE 256 + +#define SOCK_PATH "/tmp/.collectd-email" +#define MAX_CONNS 5 + +/* linked list of email and check types */ +typedef struct type { + char *name; + int value; + struct type *next; +} type_t; + +typedef struct { + type_t *head; + type_t *tail; +} type_list_t; + +/* linked list of collector thread control information */ +typedef struct collector { + pthread_t thread; + + /* socket to read data from */ + int socket; + + /* buffer to read data to */ + char buffer[BUFSIZE]; + int idx; /* current position in buffer */ + + struct collector *next; +} collector_t; + +typedef struct { + collector_t *head; + collector_t *tail; +} collector_list_t; + +/* state of the plugin */ +static int disabled = 0; + +/* thread managing "client" connections */ +static pthread_t connector; + +/* tell the connector thread that a collector is available */ +static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER; + +/* collector threads that are in use */ +static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; +static collector_list_t active; + +/* collector threads that are available for use */ +static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER; +static collector_list_t available; + +#define COUNT_FILE "email/email-%s.rrd" +static char *count_ds_def[] = +{ + "DS:count:GAUGE:"COLLECTD_HEARTBEAT":0:U", + NULL +}; +static int count_ds_num = 1; + +static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; +static type_list_t count; + +#define SIZE_FILE "email/email_size-%s.rrd" +static char *size_ds_def[] = +{ + "DS:size:GAUGE:"COLLECTD_HEARTBEAT":0:U", + NULL +}; +static int size_ds_num = 1; + +static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER; +static type_list_t size; + +#define SCORE_FILE "email/spam_score.rrd" +static char *score_ds_def[] = +{ + "DS:score:GAUGE:"COLLECTD_HEARTBEAT":U:U", + NULL +}; +static int score_ds_num = 1; + +static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER; +static double score; +static int score_count; + +#define CHECK_FILE "email/spam_check-%s.rrd" +static char *check_ds_def[] = +{ + "DS:hits:GAUGE:"COLLECTD_HEARTBEAT":0:U", + NULL +}; +static int check_ds_num = 1; + +static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER; +static type_list_t check; + +/* Increment the value of the given name in the given list by incr. */ +static void type_list_incr (type_list_t *list, char *name, int incr) +{ + if (NULL == list->head) { + list->head = (type_t *)smalloc (sizeof (type_t)); + + list->head->name = sstrdup (name); + list->head->value = incr; + list->head->next = NULL; + + list->tail = list->head; + } + else { + type_t *ptr; + + for (ptr = list->head; NULL != ptr; ptr = ptr->next) { + if (0 == strcmp (name, ptr->name)) + break; + } + + if (NULL == ptr) { + list->tail->next = (type_t *)smalloc (sizeof (type_t)); + list->tail = list->tail->next; + + list->tail->name = sstrdup (name); + list->tail->value = incr; + list->tail->next = NULL; + } + else { + ptr->value += incr; + } + } + return; +} /* static void type_list_incr (type_list_t *, char *) */ + +/* Read a single character from the socket. If an error occurs or end-of-file + * is reached return '\0'. */ +char read_char (collector_t *src) +{ + char ret = '\0'; + + fd_set fdset; + + FD_ZERO (&fdset); + FD_SET (src->socket, &fdset); + + if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { + syslog (LOG_ERR, "select() failed: %s", strerror (errno)); + return '\0'; + } + + assert (FD_ISSET (src->socket, &fdset)); + + do { + ssize_t len = 0; + + errno = 0; + if (0 > (len = read (src->socket, (void *)&ret, 1))) { + if (EINTR != errno) { + syslog (LOG_ERR, "read() failed: %s", strerror (errno)); + return '\0'; + } + } + + if (0 == len) + return '\0'; + } while (EINTR == errno); + return ret; +} /* char read_char (collector_t *) */ + +/* Read a single line (terminated by '\n') from the the socket. + * + * The return value is zero terminated and does not contain any newline + * characters. In case that no complete line is available (non-blocking mode + * should be enabled) an empty string is returned. + * + * If an error occurs or end-of-file is reached return NULL. + * + * IMPORTANT NOTE: If there is no newline character found in BUFSIZE + * characters of the input stream, the line will will be ignored! By + * definition we should not get any longer input lines, thus this is + * acceptable in this case ;-) */ +char *read_line (collector_t *src) +{ + int i = 0; + char *ret; + + assert (BUFSIZE > src->idx); + + for (i = 0; i < src->idx; ++i) { + if ('\n' == src->buffer[i]) + break; + } + + if ('\n' != src->buffer[i]) { + fd_set fdset; + + ssize_t len = 0; + + FD_ZERO (&fdset); + FD_SET (src->socket, &fdset); + + if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) { + syslog (LOG_ERR, "select() failed: %s", strerror (errno)); + return NULL; + } + + assert (FD_ISSET (src->socket, &fdset)); + + do { + errno = 0; + if (0 > (len = read (src->socket, + (void *)(&(src->buffer[0]) + src->idx), + BUFSIZE - src->idx))) { + if (EINTR != errno) { + syslog (LOG_ERR, "read() failed: %s", strerror (errno)); + return NULL; + } + } + + if (0 == len) + return NULL; + } while (EINTR == errno); + + src->idx += len; + + for (i = src->idx - len; i < src->idx; ++i) { + if ('\n' == src->buffer[i]) + break; + } + + if ('\n' != src->buffer[i]) { + ret = (char *)smalloc (1); + + ret[0] = '\0'; + + if (BUFSIZE == src->idx) { /* no space left in buffer */ + while ('\n' != read_char (src)) + /* ignore complete line */; + + src->idx = 0; + } + return ret; + } + } + + ret = (char *)smalloc (i + 1); + memcpy (ret, &(src->buffer[0]), i + 1); + ret[i] = '\0'; + + src->idx -= (i + 1); + + if (0 == src->idx) + src->buffer[0] = '\0'; + else + memmove (&(src->buffer[0]), &(src->buffer[i + 1]), src->idx); + return ret; +} /* char *read_line (collector_t *) */ + +static void *collect (void *arg) +{ + collector_t *this = (collector_t *)arg; + + int loop = 1; + + { /* put the socket in non-blocking mode */ + int flags = 0; + + errno = 0; + if (-1 == fcntl (this->socket, F_GETFL, &flags)) { + syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno)); + loop = 0; + } + + errno = 0; + if (-1 == fcntl (this->socket, F_SETFL, flags | O_NONBLOCK)) { + syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno)); + loop = 0; + } + } + + while (loop) { + char *line = read_line (this); + + if (NULL == line) { + loop = 0; + break; + } + + if ('\0' == line[0]) { + free (line); + continue; + } + + if (':' != line[1]) { + syslog (LOG_ERR, "email: syntax error in line '%s'", line); + free (line); + continue; + } + + if ('e' == line[0]) { /* e:: */ + char *type = strtok (line + 2, ":"); + char *tmp = strtok (NULL, ":"); + int bytes = 0; + + if (NULL == tmp) { + syslog (LOG_ERR, "email: syntax error in line '%s'", line); + free (line); + continue; + } + + bytes = atoi (tmp); + + pthread_mutex_lock (&count_mutex); + type_list_incr (&count, type, 1); + pthread_mutex_unlock (&count_mutex); + + pthread_mutex_lock (&size_mutex); + type_list_incr (&size, type, bytes); + pthread_mutex_unlock (&size_mutex); + } + else if ('s' == line[0]) { /* s: */ + pthread_mutex_lock (&score_mutex); + score = (score * (double)score_count + atof (line + 2)) + / (double)(score_count + 1); + ++score_count; + pthread_mutex_unlock (&score_mutex); + } + else if ('c' == line[0]) { /* c:[,,...] */ + char *type = strtok (line + 2, ","); + + do { + pthread_mutex_lock (&check_mutex); + type_list_incr (&check, type, 1); + pthread_mutex_unlock (&check_mutex); + } while (NULL != (type = strtok (NULL, ","))); + } + else { + syslog (LOG_ERR, "email: unknown type '%c'", line[0]); + } + + free (line); + } + + /* put this thread back into the available list */ + pthread_mutex_lock (&active_mutex); + { + collector_t *last; + collector_t *ptr; + + last = NULL; + + for (ptr = active.head; NULL != ptr; last = ptr, ptr = ptr->next) { + if (0 != pthread_equal (ptr->thread, this->thread)) + break; + } + + /* the current thread _has_ to be in the active list */ + assert (NULL != ptr); + + if (NULL == last) { + active.head = ptr->next; + } + else { + last->next = ptr->next; + + if (NULL == last->next) { + active.tail = last; + } + } + } + pthread_mutex_unlock (&active_mutex); + + this->next = NULL; + + pthread_mutex_lock (&available_mutex); + + if (NULL == available.head) { + available.head = this; + available.tail = this; + } + else { + available.tail->next = this; + available.tail = this; + } + + pthread_mutex_unlock (&available_mutex); + + pthread_cond_signal (&collector_available); + pthread_exit ((void *)0); +} /* void *collect (void *) */ + +static void *open_connection (void *arg) +{ + int local = 0; + + struct sockaddr_un addr; + + /* create UNIX socket */ + errno = 0; + if (-1 == (local = socket (PF_UNIX, SOCK_STREAM, 0))) { + disabled = 1; + syslog (LOG_ERR, "socket() failed: %s", strerror (errno)); + pthread_exit ((void *)1); + } + + addr.sun_family = AF_UNIX; + + strncpy (addr.sun_path, SOCK_PATH, (size_t)(UNIX_PATH_MAX - 1)); + addr.sun_path[UNIX_PATH_MAX - 1] = '\0'; + unlink (addr.sun_path); + + errno = 0; + if (-1 == bind (local, (struct sockaddr *)&addr, + offsetof (struct sockaddr_un, sun_path) + + strlen(addr.sun_path))) { + disabled = 1; + syslog (LOG_ERR, "bind() failed: %s", strerror (errno)); + pthread_exit ((void *)1); + } + + errno = 0; + if (-1 == listen (local, 5)) { + disabled = 1; + syslog (LOG_ERR, "listen() failed: %s", strerror (errno)); + pthread_exit ((void *)1); + } + + { /* initialize queue of available threads */ + int i = 0; + + collector_t *last; + + active.head = NULL; + active.tail = NULL; + + available.head = (collector_t *)smalloc (sizeof (collector_t)); + available.tail = available.head; + available.tail->next = NULL; + + last = available.head; + + for (i = 1; i < MAX_CONNS; ++i) { + last->next = (collector_t *)smalloc (sizeof (collector_t)); + last = last->next; + available.tail = last; + available.tail->next = NULL; + } + } + + while (1) { + int remote = 0; + int err = 0; + + collector_t *collector; + + pthread_attr_t ptattr; + + pthread_mutex_lock (&available_mutex); + while (NULL == available.head) { + pthread_cond_wait (&collector_available, &available_mutex); + } + pthread_mutex_unlock (&available_mutex); + + do { + errno = 0; + if (-1 == (remote = accept (local, NULL, NULL))) { + if (EINTR != errno) { + disabled = 1; + syslog (LOG_ERR, "accept() failed: %s", strerror (errno)); + pthread_exit ((void *)1); + } + } + } while (EINTR == errno); + + /* assign connection to next available thread */ + pthread_mutex_lock (&available_mutex); + + collector = available.head; + collector->socket = remote; + + if (available.head == available.tail) { + available.head = NULL; + available.tail = NULL; + } + else { + available.head = available.head->next; + } + + pthread_mutex_unlock (&available_mutex); + + collector->idx = 0; + collector->next = NULL; + + pthread_attr_init (&ptattr); + pthread_attr_setdetachstate (&ptattr, PTHREAD_CREATE_DETACHED); + + if (0 == (err = pthread_create (&collector->thread, &ptattr, collect, + (void *)collector))) { + pthread_mutex_lock (&active_mutex); + + if (NULL == active.head) { + active.head = collector; + active.tail = collector; + } + else { + active.tail->next = collector; + active.tail = collector; + } + + pthread_mutex_unlock (&active_mutex); + } + else { + pthread_mutex_lock (&available_mutex); + + if (NULL == available.head) { + available.head = collector; + available.tail = collector; + } + else { + available.tail->next = collector; + available.tail = collector; + } + + pthread_mutex_unlock (&available_mutex); + + close (remote); + syslog (LOG_ERR, "pthread_create() failed: %s", strerror (err)); + } + + pthread_attr_destroy (&ptattr); + } + pthread_exit ((void *)0); +} /* void *open_connection (void *) */ + +static void email_init (void) +{ + int err = 0; + + if (0 != (err = pthread_create (&connector, NULL, + open_connection, NULL))) { + disabled = 1; + syslog (LOG_ERR, "pthread_create() failed: %s", strerror (err)); + return; + } + return; +} /* static void email_init (void) */ + +static void count_write (char *host, char *inst, char *val) +{ + char file[BUFSIZE] = ""; + int len = 0; + + len = snprintf (file, BUFSIZE, COUNT_FILE, inst); + if ((len < 0) || (len >= BUFSIZE)) + return; + + rrd_update_file (host, file, val, count_ds_def, count_ds_num); + return; +} /* static void email_write (char *host, char *inst, char *val) */ + +static void size_write (char *host, char *inst, char *val) +{ + char file[BUFSIZE] = ""; + int len = 0; + + len = snprintf (file, BUFSIZE, SIZE_FILE, inst); + if ((len < 0) || (len >= BUFSIZE)) + return; + + rrd_update_file (host, file, val, size_ds_def, size_ds_num); + return; +} /* static void size_write (char *host, char *inst, char *val) */ + +static void score_write (char *host, char *inst, char *val) +{ + rrd_update_file (host, SCORE_FILE, val, score_ds_def, score_ds_num); + return; +} /* static void score_write (char *host, char *inst, char *val) */ + +static void check_write (char *host, char *inst, char *val) +{ + char file[BUFSIZE] = ""; + int len = 0; + + len = snprintf (file, BUFSIZE, CHECK_FILE, inst); + if ((len < 0) || (len >= BUFSIZE)) + return; + + rrd_update_file (host, file, val, check_ds_def, check_ds_num); + return; +} /* static void check_write (char *host, char *inst, char *val) */ + +static void type_submit (char *plugin, char *inst, int value) +{ + char buf[BUFSIZE] = ""; + int len = 0; + + if (0 == value) + return; + + len = snprintf (buf, BUFSIZE, "%u:%i", (unsigned int)curtime, value); + if ((len < 0) || (len >= BUFSIZE)) + return; + + plugin_submit (plugin, inst, buf); +fprintf(stderr, "plugin_submit (\"%s\", \"%s\", \"%s\")\n", plugin, inst, buf); + return; +} /* static void type_submit (char *, char *, int) */ + +static void score_submit (double value) +{ + char buf[BUFSIZE] = ""; + int len = 0; + + if (0.0 == value) + return; + + len = snprintf (buf, BUFSIZE, "%u:%.2f", (unsigned int)curtime, value); + if ((len < 0) || (len >= BUFSIZE)) + return; + + plugin_submit ("email_spam_score", NULL, buf); +fprintf(stderr, "plugin_submit (\"%s\", \"%s\", \"%s\")\n", "email_spam_score", "\0", buf); + return; +} + +static void email_read (void) +{ + type_t *ptr; + + if (disabled) + return; + + pthread_mutex_lock (&count_mutex); + + for (ptr = count.head; NULL != ptr; ptr = ptr->next) { + type_submit ("email_count", ptr->name, ptr->value); + ptr->value = 0; + } + + pthread_mutex_unlock (&count_mutex); + + pthread_mutex_lock (&size_mutex); + + for (ptr = size.head; NULL != ptr; ptr = ptr->next) { + type_submit ("email_size", ptr->name, ptr->value); + ptr->value = 0; + } + + pthread_mutex_unlock (&size_mutex); + + pthread_mutex_lock (&score_mutex); + + score_submit (score); + score = 0.0; + score_count = 0; + + pthread_mutex_unlock (&score_mutex); + + pthread_mutex_lock (&check_mutex); + + for (ptr = check.head; NULL != ptr; ptr = ptr->next) { + type_submit ("email_spam_check", ptr->name, ptr->value); + ptr->value = 0; + } + + pthread_mutex_unlock (&check_mutex); + return; +} /* static void read (void) */ + +void module_register (void) +{ + plugin_register (MODULE_NAME, email_init, email_read, NULL); + plugin_register ("email_count", NULL, NULL, count_write); + plugin_register ("email_size", NULL, NULL, size_write); + plugin_register ("email_spam_score", NULL, NULL, score_write); + plugin_register ("email_spam_check", NULL, NULL, check_write); + return; +} /* void module_register (void) */ + +/* vim: set sw=4 ts=4 tw=78 noexpandtab : */ + -- 2.11.0