2 * collectd - src/rrdd.c
3 * Copyright (C) 2008 Florian octo Forster
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Florian octo Forster <octo at verplant.org>
27 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
29 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
35 struct listen_socket_s
38 char path[PATH_MAX + 1];
40 typedef struct listen_socket_s listen_socket_t;
43 typedef struct cache_item_s cache_item_t;
49 time_t last_flush_time;
50 #define CI_FLAGS_IN_TREE 0x01
51 #define CI_FLAGS_IN_QUEUE 0x02
60 static listen_socket_t *listen_fds = NULL;
61 static size_t listen_fds_num = 0;
63 static int do_shutdown = 0;
65 static pthread_t queue_thread;
67 static pthread_t *connetion_threads = NULL;
68 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
69 static int connetion_threads_num = 0;
72 static avl_tree_t *cache_tree = NULL;
73 static cache_item_t *cache_queue_head = NULL;
74 static cache_item_t *cache_queue_tail = NULL;
75 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
76 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
81 static void sig_int_handler (int signal) /* {{{ */
84 } /* }}} void sig_int_handler */
86 static void sig_term_handler (int signal) /* {{{ */
89 } /* }}} void sig_term_handler */
91 static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */
93 cache_item_t *c0 = (cache_item_t *) v0;
94 cache_item_t *c1 = (cache_item_t *) v1;
96 assert (c0->file != NULL);
97 assert (c1->file != NULL);
99 return (strcmp (c0->file, c1->file));
100 } /* }}} int cache_tree_compare */
102 static void cache_tree_free (void *v) /* {{{ */
104 cache_item_t *c = (cache_item_t *) v;
106 assert (c->values_num == 0);
107 assert ((c->flags & CI_FLAGS_IN_TREE) != 0);
108 assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0);
113 } /* }}} void cache_tree_free */
115 static void *queue_thread_main (void *args) /* {{{ */
117 pthread_mutex_lock (&cache_lock);
118 while ((do_shutdown == 0) || (cache_queue_head != NULL))
128 if (cache_queue_head == NULL)
129 pthread_cond_wait (&cache_cond, &cache_lock);
131 if (cache_queue_head == NULL)
134 ci = cache_queue_head;
136 /* copy the relevant parts */
137 file = strdup (ci->file);
140 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
145 values_num = ci->values_num;
150 ci->last_flush_time = time (NULL);
151 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
153 cache_queue_head = ci->next;
154 if (cache_queue_head == NULL)
155 cache_queue_tail = NULL;
158 pthread_mutex_unlock (&cache_lock);
160 RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
161 file, values_num, (void *) values);
163 status = rrd_update_r (file, NULL, values_num, values);
166 RRDD_LOG (LOG_ERR, "queue_thread_main: "
167 "rrd_update_r failed with status %i.",
172 for (i = 0; i < values_num; i++)
175 pthread_mutex_lock (&cache_lock);
176 } /* while (do_shutdown == 0) */
177 pthread_mutex_unlock (&cache_lock);
179 RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
182 } /* }}} void *queue_thread_main */
184 static int handle_request_update (int fd, /* {{{ */
185 char *buffer, int buffer_size)
196 cache_item_t ci_temp;
203 RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
204 fd, (void *) buffer, buffer_size);
209 buffer_ptr += strlen (file) + 1;
213 pthread_mutex_lock (&cache_lock);
215 node = avl_search (cache_tree, (void *) &ci_temp);
218 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
221 pthread_mutex_unlock (&cache_lock);
222 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
225 memset (ci, 0, sizeof (cache_item_t));
227 ci->file = strdup (file);
228 if (ci->file == NULL)
230 pthread_mutex_unlock (&cache_lock);
231 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
238 ci->last_flush_time = now;
239 ci->flags = CI_FLAGS_IN_TREE;
241 if (avl_insert (cache_tree, (void *) ci) == NULL)
243 pthread_mutex_unlock (&cache_lock);
244 RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed.");
250 RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.",
253 else /* if (ci != NULL) */
255 ci = (cache_item_t *) node->item;
259 while (*buffer_ptr != 0)
264 buffer_ptr += strlen (value) + 1;
266 temp = (char **) realloc (ci->values,
267 sizeof (char *) * (ci->values_num + 1));
270 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
275 ci->values[ci->values_num] = strdup (value);
276 if (ci->values[ci->values_num] == NULL)
278 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
286 /* FIXME: Timeout should not be hard-coded. */
287 if (((now - ci->last_flush_time) > 300)
288 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
290 RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
293 assert (ci->next == NULL);
295 if (cache_queue_tail == NULL)
296 cache_queue_head = ci;
298 cache_queue_tail->next = ci;
299 cache_queue_tail = ci;
301 pthread_cond_signal (&cache_cond);
304 pthread_mutex_unlock (&cache_lock);
306 snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
307 answer[sizeof (answer) - 1] = 0;
309 status = write (fd, answer, sizeof (answer));
313 RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
318 } /* }}} int handle_request_update */
320 static int handle_request (int fd) /* {{{ */
325 RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
327 buffer_size = read (fd, buffer, sizeof (buffer));
330 RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
333 assert (((size_t) buffer_size) <= sizeof (buffer));
335 if ((buffer[buffer_size - 2] != 0)
336 || (buffer[buffer_size - 1] != 0))
338 RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
342 /* fields in the buffer a separated by null bytes. */
343 if (strcmp (buffer, "update") == 0)
345 int offset = strlen ("update") + 1;
346 return (handle_request_update (fd, buffer + offset,
347 buffer_size - offset));
351 RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
354 } /* }}} int handle_request */
356 static void *connection_thread_main (void *args) /* {{{ */
362 fd = *((int *) args);
364 RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
365 "connetion_threads[]..");
366 pthread_mutex_lock (&connetion_threads_lock);
370 temp = (pthread_t *) realloc (connetion_threads,
371 sizeof (pthread_t) * (connetion_threads_num + 1));
374 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
378 connetion_threads = temp;
379 connetion_threads[connetion_threads_num] = pthread_self ();
380 connetion_threads_num++;
383 pthread_mutex_unlock (&connetion_threads_lock);
384 RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
386 while (do_shutdown == 0)
388 struct pollfd pollfd;
392 pollfd.events = POLLIN | POLLPRI;
395 status = poll (&pollfd, 1, /* timeout = */ 500);
396 if (status == 0) /* timeout */
398 else if (status < 0) /* error */
403 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
407 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
409 RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
410 "poll(2) returned POLLHUP.");
414 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
416 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
417 "poll(2) returned something unexpected: %#04hx",
423 status = handle_request (fd);
431 self = pthread_self ();
432 /* Remove this thread from the connection threads list */
433 pthread_mutex_lock (&connetion_threads_lock);
434 /* Find out own index in the array */
435 for (i = 0; i < connetion_threads_num; i++)
436 if (pthread_equal (connetion_threads[i], self) != 0)
438 assert (i < connetion_threads_num);
440 /* Move the trailing threads forward. */
441 if (i < (connetion_threads_num - 1))
443 memmove (connetion_threads + i,
444 connetion_threads + i + 1,
445 sizeof (pthread_t) * (connetion_threads_num - i - 1));
448 connetion_threads_num--;
449 pthread_mutex_unlock (&connetion_threads_lock);
453 } /* }}} void *connection_thread_main */
455 static int open_listen_socket (const char *path) /* {{{ */
458 struct sockaddr_un sa;
459 listen_socket_t *temp;
462 temp = (listen_socket_t *) realloc (listen_fds,
463 sizeof (listen_fds[0]) * (listen_fds_num + 1));
466 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
470 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
472 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
475 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
479 memset (&sa, 0, sizeof (sa));
480 sa.sun_family = AF_UNIX;
481 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
483 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
486 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
492 status = listen (fd, /* backlog = */ 10);
495 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
501 listen_fds[listen_fds_num].fd = fd;
502 strncpy (listen_fds[listen_fds_num].path, path,
503 sizeof (listen_fds[listen_fds_num].path) - 1);
507 } /* }}} int open_listen_socket */
509 static int close_listen_sockets (void) /* {{{ */
513 for (i = 0; i < listen_fds_num; i++)
515 close (listen_fds[i].fd);
516 unlink (listen_fds[i].path);
524 } /* }}} int close_listen_sockets */
526 static void *listen_thread_main (void *args) /* {{{ */
532 status = open_listen_socket (RRDD_SOCK_PATH);
535 RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed.");
539 while (do_shutdown == 0)
542 struct sockaddr_un client_sa;
543 socklen_t client_sa_size;
546 client_sd = (int *) malloc (sizeof (int));
547 if (client_sd == NULL)
549 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
554 client_sa_size = sizeof (client_sa);
555 /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */
556 *client_sd = accept (listen_fds[0].fd,
557 (struct sockaddr *) &client_sa, &client_sa_size);
560 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
564 RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
567 status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
568 /* args = */ (void *) client_sd);
571 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
577 RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
579 *((unsigned long *) &tid));
580 } /* while (do_shutdown == 0) */
582 close_listen_sockets ();
584 pthread_mutex_lock (&connetion_threads_lock);
585 while (connetion_threads_num > 0)
589 wait_for = connetion_threads[0];
591 pthread_mutex_unlock (&connetion_threads_lock);
592 pthread_join (wait_for, /* retval = */ NULL);
593 pthread_mutex_lock (&connetion_threads_lock);
595 pthread_mutex_unlock (&connetion_threads_lock);
597 RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
600 } /* }}} void *listen_thread_main */
602 static int daemonize (void) /* {{{ */
611 fprintf (stderr, "daemonize: fork(2) failed.\n");
619 /* Change into the /tmp directory. */
622 /* Become session leader */
625 /* Open the first three file descriptors to /dev/null */
630 open ("/dev/null", O_RDWR);
633 #endif /* RRDD_DEBUG */
638 memset (&sa, 0, sizeof (sa));
639 sa.sa_handler = sig_int_handler;
640 sigaction (SIGINT, &sa, NULL);
642 memset (&sa, 0, sizeof (sa));
643 sa.sa_handler = sig_term_handler;
644 sigaction (SIGINT, &sa, NULL);
646 memset (&sa, 0, sizeof (sa));
647 sa.sa_handler = SIG_IGN;
648 sigaction (SIGPIPE, &sa, NULL);
651 openlog ("rrdd", LOG_PID, LOG_DAEMON);
653 cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free);
654 if (cache_tree == NULL)
656 RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed.");
660 memset (&queue_thread, 0, sizeof (queue_thread));
661 status = pthread_create (&queue_thread, /* attr = */ NULL,
662 queue_thread_main, /* args = */ NULL);
665 RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
670 } /* }}} int daemonize */
672 static int cleanup (void) /* {{{ */
674 RRDD_LOG (LOG_DEBUG, "cleanup ()");
678 RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
679 pthread_cond_signal (&cache_cond);
680 pthread_join (queue_thread, /* return = */ NULL);
681 RRDD_LOG (LOG_DEBUG, "cleanup: done");
686 } /* }}} int cleanup */
688 int main (int argc, char **argv)
692 printf ("%s by Florian Forster, Version %s\n",
693 PACKAGE_NAME, PACKAGE_VERSION);
695 status = daemonize ();
698 struct sigaction sigchld;
700 memset (&sigchld, 0, sizeof (sigchld));
701 sigchld.sa_handler = SIG_IGN;
702 sigaction (SIGCHLD, &sigchld, NULL);
706 else if (status != 0)
708 fprintf (stderr, "daemonize failed, exiting.\n");
712 listen_thread_main (NULL);
720 * vim: set sw=2 sts=2 ts=8 et fdm=marker :