2 * collectd - src/email.c
3 * Copyright (C) 2006-2008 Sebastian Harl
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
24 * Sebastian Harl <sh at tokkee.org>
28 * This plugin communicates with a spam filter, a virus scanner or similar
29 * software using a UNIX socket and a very simple protocol:
31 * e-mail type (e.g. ham, spam, virus, ...) and size
37 * successful spam checks
38 * c:<type1>[,<type2>,...]
44 #include "utils/common/common.h"
48 #include <sys/select.h>
51 /* some systems (e.g. Darwin) seem to not define UNIX_PATH_MAX at all */
53 #define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
54 #endif /* UNIX_PATH_MAX */
58 #endif /* HAVE_GRP_H */
60 #define SOCK_PATH LOCALSTATEDIR "/run/" PACKAGE_NAME "-email"
62 #define MAX_CONNS_LIMIT 16384
64 #define log_debug(...) DEBUG("email: "__VA_ARGS__)
65 #define log_err(...) ERROR("email: "__VA_ARGS__)
66 #define log_warn(...) WARNING("email: "__VA_ARGS__)
69 * Private data structures
71 /* linked list of email and check types */
83 /* collector thread control information */
84 typedef struct collector {
87 /* socket descriptor of the current/last connection */
91 /* linked list of pending connections */
93 /* socket to read data from */
96 /* linked list of connections */
108 /* valid configuration file keys */
109 static const char *config_keys[] = {"SocketFile", "SocketGroup", "SocketPerms",
111 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
113 /* socket configuration */
114 static char *sock_file;
115 static char *sock_group;
116 static int sock_perms = S_IRWXU | S_IRWXG;
117 static int max_conns = MAX_CONNS;
119 /* state of the plugin */
122 /* thread managing "client" connections */
123 static pthread_t connector = (pthread_t)0;
124 static int connector_socket = -1;
126 /* tell the collector threads that a new connection is available */
127 static pthread_cond_t conn_available = PTHREAD_COND_INITIALIZER;
129 /* connections that are waiting to be processed */
130 static pthread_mutex_t conns_mutex = PTHREAD_MUTEX_INITIALIZER;
131 static conn_list_t conns;
133 /* tell the connector thread that a collector is available */
134 static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER;
136 /* collector threads */
137 static collector_t **collectors;
139 static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
140 static int available_collectors;
142 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
143 static type_list_t list_count;
144 static type_list_t list_count_copy;
146 static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
147 static type_list_t list_size;
148 static type_list_t list_size_copy;
150 static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
152 static int score_count;
154 static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
155 static type_list_t list_check;
156 static type_list_t list_check_copy;
161 static int email_config(const char *key, const char *value) {
162 if (strcasecmp(key, "SocketFile") == 0) {
163 if (sock_file != NULL)
165 sock_file = sstrdup(value);
166 } else if (strcasecmp(key, "SocketGroup") == 0) {
167 if (sock_group != NULL)
169 sock_group = sstrdup(value);
170 } else if (strcasecmp(key, "SocketPerms") == 0) {
171 /* the user is responsible for providing reasonable values */
172 sock_perms = (int)strtol(value, NULL, 8);
173 } else if (strcasecmp(key, "MaxConns") == 0) {
174 long int tmp = strtol(value, NULL, 0);
177 fprintf(stderr, "email plugin: `MaxConns' was set to invalid "
178 "value %li, will use default %i.\n",
180 ERROR("email plugin: `MaxConns' was set to invalid "
181 "value %li, will use default %i.\n",
183 max_conns = MAX_CONNS;
184 } else if (tmp > MAX_CONNS_LIMIT) {
185 fprintf(stderr, "email plugin: `MaxConns' was set to invalid "
186 "value %li, will use hardcoded limit %i.\n",
187 tmp, MAX_CONNS_LIMIT);
188 ERROR("email plugin: `MaxConns' was set to invalid "
189 "value %li, will use hardcoded limit %i.\n",
190 tmp, MAX_CONNS_LIMIT);
191 max_conns = MAX_CONNS_LIMIT;
193 max_conns = (int)tmp;
199 } /* static int email_config (char *, char *) */
201 /* Increment the value of the given name in the given list by incr. */
202 static void type_list_incr(type_list_t *list, char *name, int incr) {
203 if (list->head == NULL) {
204 list->head = smalloc(sizeof(*list->head));
206 list->head->name = sstrdup(name);
207 list->head->value = incr;
208 list->head->next = NULL;
210 list->tail = list->head;
214 for (ptr = list->head; NULL != ptr; ptr = ptr->next) {
215 if (strcmp(name, ptr->name) == 0)
220 list->tail->next = smalloc(sizeof(*list->tail->next));
221 list->tail = list->tail->next;
223 list->tail->name = sstrdup(name);
224 list->tail->value = incr;
225 list->tail->next = NULL;
231 } /* static void type_list_incr (type_list_t *, char *) */
233 static void *collect(void *arg) {
234 collector_t *this = (collector_t *)arg;
239 pthread_mutex_lock(&conns_mutex);
241 while (conns.head == NULL) {
242 pthread_cond_wait(&conn_available, &conns_mutex);
245 connection = conns.head;
246 conns.head = conns.head->next;
248 if (conns.head == NULL) {
252 pthread_mutex_unlock(&conns_mutex);
254 /* make the socket available to the global
255 * thread and connection management */
256 this->socket = connection->socket;
258 log_debug("collect: handling connection on fd #%i", fileno(this->socket));
261 /* 256 bytes ought to be enough for anybody ;-) */
262 char line[256 + 1]; /* line + '\0' */
265 if (fgets(line, sizeof(line), this->socket) == NULL) {
267 log_err("collect: reading from socket (fd #%i) "
269 fileno(this->socket), STRERRNO);
274 size_t len = strlen(line);
275 if ((line[len - 1] != '\n') && (line[len - 1] != '\r')) {
276 log_warn("collect: line too long (> %" PRIsz " characters): "
278 sizeof(line) - 1, line);
280 while (fgets(line, sizeof(line), this->socket) != NULL)
281 if ((line[len - 1] == '\n') || (line[len - 1] == '\r'))
285 if (len < 3) { /* [a-z] ':' '\n' */
289 line[len - 1] = '\0';
291 log_debug("collect: line = '%s'", line);
293 if (line[1] != ':') {
294 log_err("collect: syntax error in line '%s'", line);
298 if (line[0] == 'e') { /* e:<type>:<bytes> */
299 char *type = line + 2;
300 char *bytes_str = strchr(type, ':');
301 if (bytes_str == NULL) {
302 log_err("collect: syntax error in line '%s'", line);
309 pthread_mutex_lock(&count_mutex);
310 type_list_incr(&list_count, type, /* increment = */ 1);
311 pthread_mutex_unlock(&count_mutex);
313 int bytes = atoi(bytes_str);
315 pthread_mutex_lock(&size_mutex);
316 type_list_incr(&list_size, type, /* increment = */ bytes);
317 pthread_mutex_unlock(&size_mutex);
319 } else if (line[0] == 's') { /* s:<value> */
320 pthread_mutex_lock(&score_mutex);
321 score = (score * (double)score_count + atof(line + 2)) /
322 (double)(score_count + 1);
324 pthread_mutex_unlock(&score_mutex);
325 } else if (line[0] == 'c') { /* c:<type1>[,<type2>,...] */
326 char *dummy = line + 2;
330 pthread_mutex_lock(&check_mutex);
331 while ((type = strtok_r(dummy, ",", &endptr)) != NULL) {
333 type_list_incr(&list_check, type, /* increment = */ 1);
335 pthread_mutex_unlock(&check_mutex);
337 log_err("collect: unknown type '%c'", line[0]);
341 log_debug("Shutting down connection on fd #%i", fileno(this->socket));
343 fclose(connection->socket);
348 pthread_mutex_lock(&available_mutex);
349 ++available_collectors;
350 pthread_mutex_unlock(&available_mutex);
352 pthread_cond_signal(&collector_available);
355 pthread_exit((void *)0);
357 } /* static void *collect (void *) */
359 static void *open_connection(void __attribute__((unused)) * arg) {
360 const char *path = (NULL == sock_file) ? SOCK_PATH : sock_file;
361 const char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group;
363 /* create UNIX socket */
365 if ((connector_socket = socket(PF_UNIX, SOCK_STREAM, 0)) == -1) {
367 log_err("socket() failed: %s", STRERRNO);
368 pthread_exit((void *)1);
371 struct sockaddr_un addr = {
372 .sun_family = AF_UNIX,
374 sstrncpy(addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1));
377 if (bind(connector_socket, (struct sockaddr *)&addr,
378 offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) ==
381 close(connector_socket);
382 connector_socket = -1;
383 log_err("bind() failed: %s", STRERRNO);
384 pthread_exit((void *)1);
388 if (listen(connector_socket, 5) == -1) {
390 close(connector_socket);
391 connector_socket = -1;
392 log_err("listen() failed: %s", STRERRNO);
393 pthread_exit((void *)1);
401 long int grbuf_size = sysconf(_SC_GETGR_R_SIZE_MAX);
403 grbuf_size = sysconf(_SC_PAGESIZE);
406 char grbuf[grbuf_size];
409 status = getgrnam_r(group, &sg, grbuf, sizeof(grbuf), &grp);
411 log_warn("getgrnam_r (%s) failed: %s", group, STRERROR(status));
412 } else if (grp == NULL) {
413 log_warn("No such group: `%s'", group);
415 status = chown(path, (uid_t)-1, grp->gr_gid);
417 log_warn("chown (%s, -1, %i) failed: %s", path, (int)grp->gr_gid,
424 if (chmod(path, sock_perms) != 0) {
425 log_warn("chmod() failed: %s", STRERRNO);
428 { /* initialize collector threads */
429 pthread_attr_t ptattr;
434 pthread_attr_init(&ptattr);
435 pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED);
437 available_collectors = max_conns;
439 collectors = smalloc(max_conns * sizeof(*collectors));
441 for (int i = 0; i < max_conns; ++i) {
442 collectors[i] = smalloc(sizeof(*collectors[i]));
443 collectors[i]->socket = NULL;
445 if (plugin_thread_create(&collectors[i]->thread, &ptattr, collect,
446 collectors[i], "email collector") != 0) {
447 log_err("plugin_thread_create() failed: %s", STRERRNO);
448 collectors[i]->thread = (pthread_t)0;
452 pthread_attr_destroy(&ptattr);
460 pthread_mutex_lock(&available_mutex);
462 while (available_collectors == 0) {
463 pthread_cond_wait(&collector_available, &available_mutex);
466 --available_collectors;
468 pthread_mutex_unlock(&available_mutex);
473 remote = accept(connector_socket, NULL, NULL);
479 close(connector_socket);
480 connector_socket = -1;
481 log_err("accept() failed: %s", STRERRNO);
482 pthread_exit((void *)1);
485 /* access() succeeded. */
489 connection = calloc(1, sizeof(*connection));
490 if (connection == NULL) {
495 connection->socket = fdopen(remote, "r");
496 connection->next = NULL;
498 if (connection->socket == NULL) {
504 pthread_mutex_lock(&conns_mutex);
506 if (conns.head == NULL) {
507 conns.head = connection;
508 conns.tail = connection;
510 conns.tail->next = connection;
511 conns.tail = conns.tail->next;
514 pthread_mutex_unlock(&conns_mutex);
516 pthread_cond_signal(&conn_available);
519 pthread_exit((void *)0);
521 } /* static void *open_connection (void *) */
523 static int email_init(void) {
524 if (plugin_thread_create(&connector, NULL, open_connection, NULL,
525 "email listener") != 0) {
527 log_err("plugin_thread_create() failed: %s", STRERRNO);
532 } /* int email_init */
534 static void type_list_free(type_list_t *t) {
538 while (this != NULL) {
539 type_t *next = this->next;
551 static int email_shutdown(void) {
552 if (connector != ((pthread_t)0)) {
553 pthread_kill(connector, SIGTERM);
554 connector = (pthread_t)0;
557 if (connector_socket >= 0) {
558 close(connector_socket);
559 connector_socket = -1;
562 /* don't allow any more connections to be processed */
563 pthread_mutex_lock(&conns_mutex);
565 available_collectors = 0;
567 if (collectors != NULL) {
568 for (int i = 0; i < max_conns; ++i) {
569 if (collectors[i] == NULL)
572 if (collectors[i]->thread != ((pthread_t)0)) {
573 pthread_kill(collectors[i]->thread, SIGTERM);
574 collectors[i]->thread = (pthread_t)0;
577 if (collectors[i]->socket != NULL) {
578 fclose(collectors[i]->socket);
579 collectors[i]->socket = NULL;
582 sfree(collectors[i]);
585 } /* if (collectors != NULL) */
587 pthread_mutex_unlock(&conns_mutex);
589 type_list_free(&list_count);
590 type_list_free(&list_count_copy);
591 type_list_free(&list_size);
592 type_list_free(&list_size_copy);
593 type_list_free(&list_check);
594 type_list_free(&list_check_copy);
596 unlink((sock_file == NULL) ? SOCK_PATH : sock_file);
601 } /* static void email_shutdown (void) */
603 static void email_submit(const char *type, const char *type_instance,
605 value_list_t vl = VALUE_LIST_INIT;
607 vl.values = &(value_t){.gauge = value};
609 sstrncpy(vl.plugin, "email", sizeof(vl.plugin));
610 sstrncpy(vl.type, type, sizeof(vl.type));
611 sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
613 plugin_dispatch_values(&vl);
614 } /* void email_submit */
616 /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed
617 * that neither the order nor the name of any element of either list is
618 * changed and no elements are deleted. The values of l1 are reset to zero
619 * after they have been copied to l2. */
620 static void copy_type_list(type_list_t *l1, type_list_t *l2) {
623 for (type_t *ptr1 = l1->head, *ptr2 = l2->head; ptr1 != NULL;
624 ptr1 = ptr1->next, last = ptr2, ptr2 = ptr2->next) {
626 ptr2 = smalloc(sizeof(*ptr2));
639 if (ptr2->name == NULL) {
640 ptr2->name = sstrdup(ptr1->name);
643 ptr2->value = ptr1->value;
649 static int email_read(void) {
657 pthread_mutex_lock(&count_mutex);
659 copy_type_list(&list_count, &list_count_copy);
661 pthread_mutex_unlock(&count_mutex);
663 for (type_t *ptr = list_count_copy.head; ptr != NULL; ptr = ptr->next) {
664 email_submit("email_count", ptr->name, ptr->value);
668 pthread_mutex_lock(&size_mutex);
670 copy_type_list(&list_size, &list_size_copy);
672 pthread_mutex_unlock(&size_mutex);
674 for (type_t *ptr = list_size_copy.head; ptr != NULL; ptr = ptr->next) {
675 email_submit("email_size", ptr->name, ptr->value);
679 pthread_mutex_lock(&score_mutex);
682 score_count_old = score_count;
686 pthread_mutex_unlock(&score_mutex);
688 if (score_count_old > 0)
689 email_submit("spam_score", "", score_old);
692 pthread_mutex_lock(&check_mutex);
694 copy_type_list(&list_check, &list_check_copy);
696 pthread_mutex_unlock(&check_mutex);
698 for (type_t *ptr = list_check_copy.head; ptr != NULL; ptr = ptr->next)
699 email_submit("spam_check", ptr->name, ptr->value);
702 } /* int email_read */
704 void module_register(void) {
705 plugin_register_config("email", email_config, config_keys, config_keys_num);
706 plugin_register_init("email", email_init);
707 plugin_register_read("email", email_read);
708 plugin_register_shutdown("email", email_shutdown);
709 } /* void module_register */