email plugin: Use a thread pool.
[collectd.git] / src / email.c
1 /**
2  * collectd - src/email.c
3  * Copyright (C) 2006  Sebastian Harl
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Author:
20  *   Sebastian Harl <sh at tokkee.org>
21  **/
22
23 /*
24  * This plugin communicates with a spam filter, a virus scanner or similar
25  * software using a UNIX socket and a very simple protocol:
26  *
27  * e-mail type (e.g. ham, spam, virus, ...) and size
28  * e:<type>:<bytes>
29  *
30  * spam score
31  * s:<value>
32  *
33  * successful spam checks
34  * c:<type1>[,<type2>,...]
35  */
36
37 #include "collectd.h"
38 #include "common.h"
39 #include "plugin.h"
40
41 #include "configfile.h"
42
43 #if HAVE_LIBPTHREAD
44 # include <pthread.h>
45 # define EMAIL_HAVE_READ 1
46 #else
47 # define EMAIL_HAVE_READ 0
48 #endif
49
50 #if HAVE_SYS_SELECT_H
51 #       include <sys/select.h>
52 #endif /* HAVE_SYS_SELECT_H */
53
54 #if HAVE_SYS_SOCKET_H
55 #       include <sys/socket.h>
56 #endif /* HAVE_SYS_SOCKET_H */
57
58 /* *sigh* glibc does not define UNIX_PATH_MAX in sys/un.h ... */
59 #if HAVE_LINUX_UN_H
60 #       include <linux/un.h>
61 #elif HAVE_SYS_UN_H
62 #       include <sys/un.h>
63 #endif /* HAVE_LINUX_UN_H | HAVE_SYS_UN_H */
64
65 /* some systems (e.g. Darwin) seem to not define UNIX_PATH_MAX at all */
66 #ifndef UNIX_PATH_MAX
67 # define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path)
68 #endif /* UNIX_PATH_MAX */
69
70 #if HAVE_GRP_H
71 #       include <grp.h>
72 #endif /* HAVE_GRP_H */
73
74 #define MODULE_NAME "email"
75
76 /* 256 bytes ought to be enough for anybody ;-) */
77 #define BUFSIZE 256
78
79 #ifndef COLLECTD_SOCKET_PREFIX
80 # define COLLECTD_SOCKET_PREFIX "/tmp/.collectd-"
81 #endif /* COLLECTD_SOCKET_PREFIX */
82
83 #define SOCK_PATH COLLECTD_SOCKET_PREFIX"email"
84 #define MAX_CONNS 5
85 #define MAX_CONNS_LIMIT 16384
86
87 /*
88  * Private data structures
89  */
90 #if EMAIL_HAVE_READ
91 /* linked list of email and check types */
92 typedef struct type {
93         char        *name;
94         int         value;
95         struct type *next;
96 } type_t;
97
98 typedef struct {
99         type_t *head;
100         type_t *tail;
101 } type_list_t;
102
103 /* collector thread control information */
104 typedef struct collector {
105         pthread_t thread;
106
107         /* socket descriptor of the current/last connection */
108         int socket;
109 } collector_t;
110
111 /* linked list of pending connections */
112 typedef struct conn {
113         /* socket to read data from */
114         int socket;
115
116         /* buffer to read data to */
117         char *buffer;
118         int  idx; /* current position in buffer */
119
120         struct conn *next;
121 } conn_t;
122
123 typedef struct {
124         conn_t *head;
125         conn_t *tail;
126 } conn_list_t;
127 #endif /* EMAIL_HAVE_READ */
128
129 /*
130  * Private variables
131  */
132 #if EMAIL_HAVE_READ
133 /* valid configuration file keys */
134 static char *config_keys[] =
135 {
136         "SocketGroup",
137         "SocketPerms",
138         "MaxConns",
139         NULL
140 };
141 static int config_keys_num = 3;
142
143 /* socket configuration */
144 static char *sock_group = COLLECTD_GRP_NAME;
145 static int  sock_perms  = S_IRWXU | S_IRWXG;
146 static int  max_conns   = MAX_CONNS;
147
148 /* state of the plugin */
149 static int disabled = 0;
150
151 /* thread managing "client" connections */
152 static pthread_t connector;
153 static int connector_socket;
154
155 /* tell the collector threads that a new connection is available */
156 static pthread_cond_t conn_available = PTHREAD_COND_INITIALIZER;
157
158 /* connections that are waiting to be processed */
159 static pthread_mutex_t conns_mutex = PTHREAD_MUTEX_INITIALIZER;
160 static conn_list_t conns;
161
162 /* tell the connector thread that a collector is available */
163 static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER;
164
165 /* collector threads */
166 static collector_t **collectors;
167
168 static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
169 static int available_collectors;
170
171 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
172 static type_list_t count;
173
174 static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
175 static type_list_t size;
176
177 static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
178 static double score;
179 static int score_count;
180
181 static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
182 static type_list_t check;
183 #endif /* EMAIL_HAVE_READ */
184
185 #define COUNT_FILE "email/email-%s.rrd"
186 static char *count_ds_def[] =
187 {
188         "DS:count:GAUGE:"COLLECTD_HEARTBEAT":0:U",
189         NULL
190 };
191 static int count_ds_num = 1;
192
193 #define SIZE_FILE  "email/email_size-%s.rrd"
194 static char *size_ds_def[] =
195 {
196         "DS:size:GAUGE:"COLLECTD_HEARTBEAT":0:U",
197         NULL
198 };
199 static int size_ds_num = 1;
200
201 #define SCORE_FILE "email/spam_score.rrd"
202 static char *score_ds_def[] =
203 {
204         "DS:score:GAUGE:"COLLECTD_HEARTBEAT":U:U",
205         NULL
206 };
207 static int score_ds_num = 1;
208
209 #define CHECK_FILE "email/spam_check-%s.rrd"
210 static char *check_ds_def[] =
211 {
212         "DS:hits:GAUGE:"COLLECTD_HEARTBEAT":0:U",
213         NULL
214 };
215 static int check_ds_num = 1;
216
217 #if EMAIL_HAVE_READ
218 static int email_config (char *key, char *value)
219 {
220         if (0 == strcasecmp (key, "SocketGroup")) {
221                 sock_group = sstrdup (value);
222         }
223         else if (0 == strcasecmp (key, "SocketPerms")) {
224                 /* the user is responsible for providing reasonable values */
225                 sock_perms = (int)strtol (value, NULL, 8);
226         }
227         else if (0 == strcasecmp (key, "MaxConns")) {
228                 long int tmp = strtol (value, NULL, 0);
229
230                 if (tmp < 1) {
231                         fprintf (stderr, "email plugin: `MaxConns' was set to invalid "
232                                         "value %li, will use default %i.\n",
233                                         tmp, MAX_CONNS);
234                         max_conns = MAX_CONNS;
235                 }
236                 else if (tmp > MAX_CONNS_LIMIT) {
237                         fprintf (stderr, "email plugin: `MaxConns' was set to invalid "
238                                         "value %li, will use hardcoded limit %i.\n",
239                                         tmp, MAX_CONNS_LIMIT);
240                         max_conns = MAX_CONNS_LIMIT;
241                 }
242                 else {
243                         max_conns = (int)tmp;
244                 }
245         }
246         else {
247                 return -1;
248         }
249         return 0;
250 } /* static int email_config (char *, char *) */
251
252 /* Increment the value of the given name in the given list by incr. */
253 static void type_list_incr (type_list_t *list, char *name, int incr)
254 {
255         if (NULL == list->head) {
256                 list->head = (type_t *)smalloc (sizeof (type_t));
257
258                 list->head->name  = sstrdup (name);
259                 list->head->value = incr;
260                 list->head->next  = NULL;
261
262                 list->tail = list->head;
263         }
264         else {
265                 type_t *ptr;
266
267                 for (ptr = list->head; NULL != ptr; ptr = ptr->next) {
268                         if (0 == strcmp (name, ptr->name))
269                                 break;
270                 }
271
272                 if (NULL == ptr) {
273                         list->tail->next = (type_t *)smalloc (sizeof (type_t));
274                         list->tail = list->tail->next;
275
276                         list->tail->name  = sstrdup (name);
277                         list->tail->value = incr;
278                         list->tail->next  = NULL;
279                 }
280                 else {
281                         ptr->value += incr;
282                 }
283         }
284         return;
285 } /* static void type_list_incr (type_list_t *, char *) */
286
287 /* Read a single character from the socket. If an error occurs or end-of-file
288  * is reached return '\0'. */
289 char read_char (conn_t *src)
290 {
291         char ret = '\0';
292
293         fd_set fdset;
294
295         FD_ZERO (&fdset);
296         FD_SET (src->socket, &fdset);
297
298         if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
299                 syslog (LOG_ERR, "select() failed: %s", strerror (errno));
300                 return '\0';
301         }
302
303         assert (FD_ISSET (src->socket, &fdset));
304
305         do {
306                 ssize_t len = 0;
307
308                 errno = 0;
309                 if (0 > (len = read (src->socket, (void *)&ret, 1))) {
310                         if (EINTR != errno) {
311                                 syslog (LOG_ERR, "read() failed: %s", strerror (errno));
312                                 return '\0';
313                         }
314                 }
315
316                 if (0 == len)
317                         return '\0';
318         } while (EINTR == errno);
319         return ret;
320 } /* char read_char (conn_t *) */
321
322 /* Read a single line (terminated by '\n') from the the socket.
323  *
324  * The return value is zero terminated and does not contain any newline
325  * characters. In case that no complete line is available (non-blocking mode
326  * should be enabled) an empty string is returned.
327  *
328  * If an error occurs or end-of-file is reached return NULL.
329  *
330  * IMPORTANT NOTE: If there is no newline character found in BUFSIZE
331  * characters of the input stream, the line will will be ignored! By
332  * definition we should not get any longer input lines, thus this is
333  * acceptable in this case ;-) */
334 char *read_line (conn_t *src)
335 {
336         int  i = 0;
337         char *ret;
338
339         assert (BUFSIZE > src->idx);
340
341         for (i = 0; i < src->idx; ++i) {
342                 if ('\n' == src->buffer[i])
343                         break;
344         }
345
346         if (i == src->idx) {
347                 fd_set fdset;
348
349                 ssize_t len = 0;
350
351                 FD_ZERO (&fdset);
352                 FD_SET (src->socket, &fdset);
353
354                 if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
355                         syslog (LOG_ERR, "select() failed: %s", strerror (errno));
356                         return NULL;
357                 }
358
359                 assert (FD_ISSET (src->socket, &fdset));
360
361                 do {
362                         errno = 0;
363                         if (0 > (len = read (src->socket,
364                                                         (void *)(src->buffer + src->idx),
365                                                         BUFSIZE - src->idx))) {
366                                 if (EINTR != errno) {
367                                         syslog (LOG_ERR, "read() failed: %s", strerror (errno));
368                                         return NULL;
369                                 }
370                         }
371
372                         if (0 == len)
373                                 return NULL;
374                 } while (EINTR == errno);
375
376                 src->idx += len;
377
378                 for (i = src->idx - len; i < src->idx; ++i) {
379                         if ('\n' == src->buffer[i])
380                                 break;
381                 }
382
383                 if (i == src->idx) {
384                         ret = (char *)smalloc (1);
385
386                         ret[0] = '\0';
387
388                         if (BUFSIZE == src->idx) { /* no space left in buffer */
389                                 while ('\n' != read_char (src))
390                                         /* ignore complete line */;
391
392                                 src->idx = 0;
393                         }
394                         return ret;
395                 }
396         }
397
398         ret = (char *)smalloc (i + 1);
399         memcpy (ret, src->buffer, i + 1);
400         ret[i] = '\0';
401
402         src->idx -= (i + 1);
403
404         if (0 == src->idx)
405                 src->buffer[0] = '\0';
406         else
407                 memmove (src->buffer, src->buffer + i + 1, src->idx);
408         return ret;
409 } /* char *read_line (conn_t *) */
410
411 static void *collect (void *arg)
412 {
413         collector_t *this = (collector_t *)arg;
414
415         char *buffer = (char *)smalloc (BUFSIZE);
416
417         while (1) {
418                 int loop = 1;
419
420                 conn_t *connection;
421
422                 pthread_mutex_lock (&conns_mutex);
423
424                 while (NULL == conns.head) {
425                         pthread_cond_wait (&conn_available, &conns_mutex);
426                 }
427
428                 connection = conns.head;
429                 conns.head = conns.head->next;
430
431                 if (NULL == conns.head) {
432                         conns.tail = NULL;
433                 }
434
435                 this->socket = connection->socket;
436
437                 pthread_mutex_unlock (&conns_mutex);
438
439                 connection->buffer = buffer;
440                 connection->idx    = 0;
441
442                 { /* put the socket in non-blocking mode */
443                         int flags = 0;
444
445                         errno = 0;
446                         if (-1 == fcntl (connection->socket, F_GETFL, &flags)) {
447                                 syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
448                                 loop = 0;
449                         }
450
451                         errno = 0;
452                         if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) {
453                                 syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
454                                 loop = 0;
455                         }
456                 }
457
458                 while (loop) {
459                         char *line = read_line (connection);
460
461                         if (NULL == line) {
462                                 loop = 0;
463                                 break;
464                         }
465
466                         if ('\0' == line[0]) {
467                                 free (line);
468                                 continue;
469                         }
470
471                         if (':' != line[1]) {
472                                 syslog (LOG_ERR, "email: syntax error in line '%s'", line);
473                                 free (line);
474                                 continue;
475                         }
476
477                         if ('e' == line[0]) { /* e:<type>:<bytes> */
478                                 char *ptr  = NULL;
479                                 char *type = strtok_r (line + 2, ":", &ptr);
480                                 char *tmp  = strtok_r (NULL, ":", &ptr);
481                                 int  bytes = 0;
482
483                                 if (NULL == tmp) {
484                                         syslog (LOG_ERR, "email: syntax error in line '%s'", line);
485                                         free (line);
486                                         continue;
487                                 }
488
489                                 bytes = atoi (tmp);
490
491                                 pthread_mutex_lock (&count_mutex);
492                                 type_list_incr (&count, type, 1);
493                                 pthread_mutex_unlock (&count_mutex);
494
495                                 pthread_mutex_lock (&size_mutex);
496                                 type_list_incr (&size, type, bytes);
497                                 pthread_mutex_unlock (&size_mutex);
498                         }
499                         else if ('s' == line[0]) { /* s:<value> */
500                                 pthread_mutex_lock (&score_mutex);
501                                 score = (score * (double)score_count + atof (line + 2))
502                                                 / (double)(score_count + 1);
503                                 ++score_count;
504                                 pthread_mutex_unlock (&score_mutex);
505                         }
506                         else if ('c' == line[0]) { /* c:<type1>[,<type2>,...] */
507                                 char *ptr  = NULL;
508                                 char *type = strtok_r (line + 2, ",", &ptr);
509
510                                 do {
511                                         pthread_mutex_lock (&check_mutex);
512                                         type_list_incr (&check, type, 1);
513                                         pthread_mutex_unlock (&check_mutex);
514                                 } while (NULL != (type = strtok_r (NULL, ",", &ptr)));
515                         }
516                         else {
517                                 syslog (LOG_ERR, "email: unknown type '%c'", line[0]);
518                         }
519
520                         free (line);
521                 } /* while (loop) */
522
523                 close (connection->socket);
524
525                 free (connection);
526
527                 pthread_mutex_lock (&available_mutex);
528                 ++available_collectors;
529                 pthread_mutex_unlock (&available_mutex);
530
531                 pthread_cond_signal (&collector_available);
532         } /* while (1) */
533
534         free (buffer);
535         pthread_exit ((void *)0);
536 } /* void *collect (void *) */
537
538 static void *open_connection (void *arg)
539 {
540         struct sockaddr_un addr;
541
542         /* create UNIX socket */
543         errno = 0;
544         if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) {
545                 disabled = 1;
546                 syslog (LOG_ERR, "socket() failed: %s", strerror (errno));
547                 pthread_exit ((void *)1);
548         }
549
550         addr.sun_family = AF_UNIX;
551
552         strncpy (addr.sun_path, SOCK_PATH, (size_t)(UNIX_PATH_MAX - 1));
553         addr.sun_path[UNIX_PATH_MAX - 1] = '\0';
554         unlink (addr.sun_path);
555
556         errno = 0;
557         if (-1 == bind (connector_socket, (struct sockaddr *)&addr,
558                                 offsetof (struct sockaddr_un, sun_path)
559                                         + strlen(addr.sun_path))) {
560                 disabled = 1;
561                 syslog (LOG_ERR, "bind() failed: %s", strerror (errno));
562                 pthread_exit ((void *)1);
563         }
564
565         errno = 0;
566         if (-1 == listen (connector_socket, 5)) {
567                 disabled = 1;
568                 syslog (LOG_ERR, "listen() failed: %s", strerror (errno));
569                 pthread_exit ((void *)1);
570         }
571
572         if ((uid_t)0 == geteuid ()) {
573                 struct group *grp;
574
575                 errno = 0;
576                 if (NULL != (grp = getgrnam (sock_group))) {
577                         errno = 0;
578                         if (0 != chown (SOCK_PATH, (uid_t)-1, grp->gr_gid)) {
579                                 syslog (LOG_WARNING, "chown() failed: %s", strerror (errno));
580                         }
581                 }
582                 else {
583                         syslog (LOG_WARNING, "getgrnam() failed: %s", strerror (errno));
584                 }
585         }
586         else {
587                 syslog (LOG_WARNING, "not running as root");
588         }
589
590         errno = 0;
591         if (0 != chmod (SOCK_PATH, sock_perms)) {
592                 syslog (LOG_WARNING, "chmod() failed: %s", strerror (errno));
593         }
594
595         { /* initialize collector threads */
596                 int i   = 0;
597                 int err = 0;
598
599                 pthread_attr_t ptattr;
600
601                 conns.head = NULL;
602                 conns.tail = NULL;
603
604                 pthread_attr_init (&ptattr);
605                 pthread_attr_setdetachstate (&ptattr, PTHREAD_CREATE_DETACHED);
606
607                 available_collectors = max_conns;
608
609                 collectors =
610                         (collector_t **)smalloc (max_conns * sizeof (collector_t *));
611
612                 for (i = 0; i < max_conns; ++i) {
613                         collectors[i] = (collector_t *)smalloc (sizeof (collector_t));
614                         collectors[i]->socket = 0;
615
616                         if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr,
617                                                         collect, collectors[i]))) {
618                                 syslog (LOG_ERR, "pthread_create() failed: %s",
619                                                 strerror (err));
620                         }
621                 }
622
623                 pthread_attr_destroy (&ptattr);
624         }
625
626         while (1) {
627                 int remote = 0;
628
629                 conn_t *connection;
630
631                 pthread_mutex_lock (&available_mutex);
632
633                 while (0 == available_collectors) {
634                         pthread_cond_wait (&collector_available, &available_mutex);
635                 }
636
637                 --available_collectors;
638
639                 pthread_mutex_unlock (&available_mutex);
640
641                 do {
642                         errno = 0;
643                         if (-1 == (remote = accept (connector_socket, NULL, NULL))) {
644                                 if (EINTR != errno) {
645                                         disabled = 1;
646                                         syslog (LOG_ERR, "accept() failed: %s", strerror (errno));
647                                         pthread_exit ((void *)1);
648                                 }
649                         }
650                 } while (EINTR == errno);
651
652                 connection = (conn_t *)smalloc (sizeof (conn_t));
653
654                 connection->socket = remote;
655                 connection->next   = NULL;
656
657                 pthread_mutex_lock (&conns_mutex);
658
659                 if (NULL == conns.head) {
660                         conns.head = connection;
661                         conns.tail = connection;
662                 }
663                 else {
664                         conns.tail->next = connection;
665                         conns.tail = conns.tail->next;
666                 }
667
668                 pthread_mutex_unlock (&conns_mutex);
669
670                 pthread_cond_signal (&conn_available);
671         }
672         pthread_exit ((void *)0);
673 } /* void *open_connection (void *) */
674 #endif /* EMAIL_HAVE_READ */
675
676 static void email_init (void)
677 {
678 #if EMAIL_HAVE_READ
679         int err = 0;
680
681         if (0 != (err = pthread_create (&connector, NULL,
682                                 open_connection, NULL))) {
683                 disabled = 1;
684                 syslog (LOG_ERR, "pthread_create() failed: %s", strerror (err));
685                 return;
686         }
687 #endif /* EMAIL_HAVE_READ */
688         return;
689 } /* static void email_init (void) */
690
691 #if EMAIL_HAVE_READ
692 static void email_shutdown (void)
693 {
694         int i = 0;
695
696         if (disabled)
697                 return;
698
699         close (connector_socket);
700         pthread_kill (connector, SIGTERM);
701
702         /* don't allow any more connections to be processed */
703         pthread_mutex_lock (&conns_mutex);
704
705         for (i = 0; i < max_conns; ++i) {
706                 close (collectors[i]->socket);
707                 pthread_kill (collectors[i]->thread, SIGTERM);
708         }
709
710         pthread_mutex_unlock (&conns_mutex);
711
712         unlink (SOCK_PATH);
713         return;
714 } /* static void email_shutdown (void) */
715 #endif /* EMAIL_HAVE_READ */
716
717 static void count_write (char *host, char *inst, char *val)
718 {
719         char file[BUFSIZE] = "";
720         int  len           = 0;
721
722         len = snprintf (file, BUFSIZE, COUNT_FILE, inst);
723         if ((len < 0) || (len >= BUFSIZE))
724                 return;
725
726         rrd_update_file (host, file, val, count_ds_def, count_ds_num);
727         return;
728 } /* static void email_write (char *host, char *inst, char *val) */
729
730 static void size_write (char *host, char *inst, char *val)
731 {
732         char file[BUFSIZE] = "";
733         int  len           = 0;
734
735         len = snprintf (file, BUFSIZE, SIZE_FILE, inst);
736         if ((len < 0) || (len >= BUFSIZE))
737                 return;
738
739         rrd_update_file (host, file, val, size_ds_def, size_ds_num);
740         return;
741 } /* static void size_write (char *host, char *inst, char *val) */
742
743 static void score_write (char *host, char *inst, char *val)
744 {
745         rrd_update_file (host, SCORE_FILE, val, score_ds_def, score_ds_num);
746         return;
747 } /* static void score_write (char *host, char *inst, char *val) */
748
749 static void check_write (char *host, char *inst, char *val)
750 {
751         char file[BUFSIZE] = "";
752         int  len           = 0;
753
754         len = snprintf (file, BUFSIZE, CHECK_FILE, inst);
755         if ((len < 0) || (len >= BUFSIZE))
756                 return;
757
758         rrd_update_file (host, file, val, check_ds_def, check_ds_num);
759         return;
760 } /* static void check_write (char *host, char *inst, char *val) */
761
762 #if EMAIL_HAVE_READ
763 static void type_submit (char *plugin, char *inst, int value)
764 {
765         char buf[BUFSIZE] = "";
766         int  len          = 0;
767
768         if (0 == value)
769                 return;
770
771         len = snprintf (buf, BUFSIZE, "%u:%i", (unsigned int)curtime, value);
772         if ((len < 0) || (len >= BUFSIZE))
773                 return;
774
775         plugin_submit (plugin, inst, buf);
776         return;
777 } /* static void type_submit (char *, char *, int) */
778
779 static void score_submit (double value)
780 {
781         char buf[BUFSIZE] = "";
782         int  len          = 0;
783
784         if (0.0 == value)
785                 return;
786
787         len = snprintf (buf, BUFSIZE, "%u:%.2f", (unsigned int)curtime, value);
788         if ((len < 0) || (len >= BUFSIZE))
789                 return;
790
791         plugin_submit ("email_spam_score", NULL, buf);
792         return;
793 }
794
795 static void email_read (void)
796 {
797         type_t *ptr;
798
799         if (disabled)
800                 return;
801
802         pthread_mutex_lock (&count_mutex);
803
804         for (ptr = count.head; NULL != ptr; ptr = ptr->next) {
805                 type_submit ("email_count", ptr->name, ptr->value);
806                 ptr->value = 0;
807         }
808
809         pthread_mutex_unlock (&count_mutex);
810
811         pthread_mutex_lock (&size_mutex);
812
813         for (ptr = size.head; NULL != ptr; ptr = ptr->next) {
814                 type_submit ("email_size", ptr->name, ptr->value);
815                 ptr->value = 0;
816         }
817
818         pthread_mutex_unlock (&size_mutex);
819
820         pthread_mutex_lock (&score_mutex);
821
822         score_submit (score);
823         score = 0.0;
824         score_count = 0;
825
826         pthread_mutex_unlock (&score_mutex);
827
828         pthread_mutex_lock (&check_mutex);
829
830         for (ptr = check.head; NULL != ptr; ptr = ptr->next) {
831                 type_submit ("email_spam_check", ptr->name, ptr->value);
832                 ptr->value = 0;
833         }
834
835         pthread_mutex_unlock (&check_mutex);
836         return;
837 } /* static void read (void) */
838 #else /* if !EMAIL_HAVE_READ */
839 # define email_read NULL
840 #endif
841
842 void module_register (void)
843 {
844         plugin_register (MODULE_NAME, email_init, email_read, NULL);
845         plugin_register ("email_count", NULL, NULL, count_write);
846         plugin_register ("email_size", NULL, NULL, size_write);
847         plugin_register ("email_spam_score", NULL, NULL, score_write);
848         plugin_register ("email_spam_check", NULL, NULL, check_write);
849 #if EMAIL_HAVE_READ
850         plugin_register_shutdown (MODULE_NAME, email_shutdown);
851         cf_register (MODULE_NAME, email_config, config_keys, config_keys_num);
852 #endif /* EMAIL_HAVE_READ */
853         return;
854 } /* void module_register (void) */
855
856 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */
857