From: Florian Forster Date: Sun, 22 Jun 2008 17:46:08 +0000 (+0200) Subject: src/rrd_daemon.c: Add caching daemon. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=04b1f7c49178e9c86b2d77d4e4e040dfc6483d7d;p=rrdtool.git src/rrd_daemon.c: Add caching daemon. Add the daemon from the RRDd project. --- diff --git a/doc/Makefile.am b/doc/Makefile.am index 16fd617..dd898ce 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -10,7 +10,7 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej POD = bin_dec_hex.pod rrddump.pod rrdgraph_examples.pod rrdrestore.pod rrdupdate.pod \ cdeftutorial.pod rrdfetch.pod rrdgraph_graph.pod rrdthreads.pod rrdxport.pod \ - rpntutorial.pod rrdfirst.pod rrdgraph_rpn.pod rrdtool.pod \ + rpntutorial.pod rrdfirst.pod rrdgraph_rpn.pod rrdtool.pod rrdd.pod \ rrd-beginners.pod rrdinfo.pod rrdtune.pod rrdbuild.pod \ rrdcgi.pod rrdgraph.pod rrdlast.pod rrdlastupdate.pod \ rrdcreate.pod rrdgraph_data.pod rrdresize.pod rrdtutorial.pod diff --git a/doc/rrdd.pod b/doc/rrdd.pod new file mode 100644 index 0000000..e0c322a --- /dev/null +++ b/doc/rrdd.pod @@ -0,0 +1,62 @@ +=pod + +=head1 NAME + +RRDd - Data caching daemon for rrdtool + +=head1 SYNOPSIS + +B [B<-l> I
] [B<-w> I] [B<-f> I] + +=head1 DESCRIPTION + +RRDd is a daemon that receives updates to existing RRD files, accumulates them +and, if enough have been received or a defined time has passed, writes the +updates to the RRD file. A I command may be used to force writing of +values to disk, so that graphing facilities and similar can work with +up-to-date data. + +=head1 OPTIONS + +=over 4 + +=item B<-l> I
+ +Tells the daemon to bind to I
and accept incoming connections on that +socket. If I
begins with C, everthing following that prefix is +interpreted as the path to a UNIX domain socket. Otherwise the address or node +name are resolved using L. + +=item B<-w> I + +Data is written to disk every I seconds. + +=item B<-f> I + +Every I seconds the entire cache is searched for old values which are +written to disk. This only concerns files to which updates have stopped, so +setting this to a high value, such as 3600 seconds, is acceptable in most +cases. + +=head1 BUGS + +=over 4 + +=item + +Uses way too much CPU time after running for a while. + +=item + +Base directory is currently hard coded. The daemon will chdir to C. + +=back + +=head1 SEE ALSO + +L, L + +=head1 AUHOR + +RRDd and this manual page have been written by Florian Forster +EoctoEatEverplant.orgE. diff --git a/src/Makefile.am b/src/Makefile.am index 5311592..d7f82ec 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -82,7 +82,7 @@ librrd_th_la_LIBADD = $(ALL_LIBS) include_HEADERS = rrd.h rrd_format.h -bin_PROGRAMS = rrdtool rrdupdate +bin_PROGRAMS = rrdtool rrdupdate rrdd if BUILD_RRDCGI bin_PROGRAMS += rrdcgi @@ -98,6 +98,11 @@ rrdtool_SOURCES = rrd_tool.c rrdtool_DEPENDENCIES = librrd.la rrdtool_LDADD = librrd.la +rrdd_SOURCES = rrd_daemon.c +rrdd_DEPENDENCIES = librrd.la +rrdd_CPPFLAGS = -DVERSION='"$(VERSION)"' +rrdd_LDADD = librrd.la + # strftime is here because we do not usually need it. unices have propper # iso date support EXTRA_DIST= strftime.c strftime.h \ diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c new file mode 100644 index 0000000..3af33dc --- /dev/null +++ b/src/rrd_daemon.c @@ -0,0 +1,1057 @@ +/** + * RRDTool - src/rrd_daemon.c + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +/* + * First tell the compiler to stick to the C99 and POSIX standards as close as + * possible. + */ +#ifndef __STRICT_ANSI__ /* {{{ */ +# define __STRICT_ANSI__ +#endif + +#ifndef _ISOC99_SOURCE +# define _ISOC99_SOURCE +#endif + +#ifdef _POSIX_C_SOURCE +# undef _POSIX_C_SOURCE +#endif +#define _POSIX_C_SOURCE 200112L + +/* Single UNIX needed for strdup. */ +#ifdef _XOPEN_SOURCE +# undef _XOPEN_SOURCE +#endif +#define _XOPEN_SOURCE 500 + +#ifndef _REENTRANT +# define _REENTRANT +#endif + +#ifndef _THREAD_SAFE +# define _THREAD_SAFE +#endif + +#ifdef _GNU_SOURCE +# undef _GNU_SOURCE +#endif +/* }}} */ + +/* + * Now for some includes.. + */ +#include "rrd.h" /* {{{ */ +#include "rrd_client.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +/* }}} */ + +#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) + +#ifndef __GNUC__ +# define __attribute__(x) /**/ +#endif + +#if 0 +/* FIXME: I don't want to declare strdup myself! */ +extern char *strdup (const char *s); +#endif + +/* + * Types + */ +struct listen_socket_s +{ + int fd; + char path[PATH_MAX + 1]; +}; +typedef struct listen_socket_s listen_socket_t; + +struct cache_item_s; +typedef struct cache_item_s cache_item_t; +struct cache_item_s +{ + char *file; + char **values; + int values_num; + time_t last_flush_time; +#define CI_FLAGS_IN_TREE 0x01 +#define CI_FLAGS_IN_QUEUE 0x02 + int flags; + + cache_item_t *next; +}; + +/* + * Variables + */ +static listen_socket_t *listen_fds = NULL; +static size_t listen_fds_num = 0; + +static int do_shutdown = 0; + +static pthread_t queue_thread; + +static pthread_t *connetion_threads = NULL; +static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connetion_threads_num = 0; + +/* Cache stuff */ +static GTree *cache_tree = NULL; +static cache_item_t *cache_queue_head = NULL; +static cache_item_t *cache_queue_tail = NULL; +static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; + +static int config_write_interval = 300; +static int config_flush_interval = 3600; + +static char **config_listen_address_list = NULL; +static int config_listen_address_list_len = 0; + +/* + * Functions + */ +static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_int_handler */ + +static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_term_handler */ + +/* + * enqueue_cache_item: + * `cache_lock' must be acquired before calling this function! + */ +static int enqueue_cache_item (cache_item_t *ci) /* {{{ */ +{ + RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", + ci->file); + + if (ci == NULL) + return (-1); + + if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + return (-1); + + assert (ci->next == NULL); + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + + return (0); +} /* }}} int enqueue_cache_item */ + +/* + * tree_callback_flush: + * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * while this is in progress. + */ +static gboolean tree_callback_flush (gpointer key /* {{{ */ + __attribute__((unused)), gpointer value, gpointer data) +{ + cache_item_t *ci; + time_t now; + + key = NULL; /* make compiler happy */ + + ci = (cache_item_t *) value; + now = *((time_t *) data); + + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + enqueue_cache_item (ci); + + return (TRUE); +} /* }}} gboolean tree_callback_flush */ + +static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + struct timeval now; + struct timespec next_flush; + + gettimeofday (&now, NULL); + next_flush.tv_sec = now.tv_sec + config_flush_interval; + next_flush.tv_nsec = 1000 * now.tv_usec; + + pthread_mutex_lock (&cache_lock); + while ((do_shutdown == 0) || (cache_queue_head != NULL)) + { + cache_item_t *ci; + char *file; + char **values; + int values_num; + int status; + int i; + + /* First, check if it's time to do the cache flush. */ + gettimeofday (&now, NULL); + if ((now.tv_sec > next_flush.tv_sec) + || ((now.tv_sec == next_flush.tv_sec) + && ((1000 * now.tv_usec) > next_flush.tv_nsec))) + { + time_t time_now; + + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + time_now = time (NULL); + + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now); + + /* Determine the time of the next cache flush. */ + while (next_flush.tv_sec < now.tv_sec) + next_flush.tv_sec += config_flush_interval; + } + + /* Now, check if there's something to store away. If not, wait until + * something comes in or it's time to do the cache flush. */ + if (cache_queue_head == NULL) + { + struct timespec timeout; + + timeout.tv_sec = next_flush.tv_sec - now.tv_sec; + if (next_flush.tv_nsec < (1000 * now.tv_usec)) + { + timeout.tv_sec--; + timeout.tv_nsec = 1000000000 + next_flush.tv_nsec + - (1000 * now.tv_usec); + } + else + { + timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec); + } + + pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout); + } + + /* Check if a value has arrived. This may be NULL if we timed out or there + * was an interrupt such as a signal. */ + if (cache_queue_head == NULL) + continue; + + ci = cache_queue_head; + + /* copy the relevant parts */ + file = strdup (ci->file); + if (file == NULL) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed."); + continue; + } + + values = ci->values; + values_num = ci->values_num; + + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = time (NULL); + ci->flags &= ~(CI_FLAGS_IN_QUEUE); + + cache_queue_head = ci->next; + if (cache_queue_head == NULL) + cache_queue_tail = NULL; + ci->next = NULL; + + pthread_mutex_unlock (&cache_lock); + + RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", + file, values_num, (void *) values); + + status = rrd_update_r (file, NULL, values_num, (void *) values); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: " + "rrd_update_r failed with status %i.", + status); + } + + free (file); + for (i = 0; i < values_num; i++) + free (values[i]); + + pthread_mutex_lock (&cache_lock); + } /* while (do_shutdown == 0) */ + pthread_mutex_unlock (&cache_lock); + + RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); + + return (NULL); +} /* }}} void *queue_thread_main */ + +static int handle_request_update (int fd, /* {{{ */ + char *buffer, int buffer_size) +{ + char *file; + char *value; + char *buffer_ptr; + int values_num = 0; + int status; + + time_t now; + + cache_item_t *ci; + char answer[4096]; + + now = time (NULL); + + RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)", + fd, (void *) buffer, buffer_size); + + buffer_ptr = buffer; + + file = buffer_ptr; + buffer_ptr += strlen (file) + 1; + + pthread_mutex_lock (&cache_lock); + + ci = g_tree_lookup (cache_tree, file); + if (ci == NULL) + { + ci = (cache_item_t *) malloc (sizeof (cache_item_t)); + if (ci == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + return (-1); + } + memset (ci, 0, sizeof (cache_item_t)); + + ci->file = strdup (file); + if (ci->file == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + free (ci); + return (-1); + } + + ci->values = NULL; + ci->values_num = 0; + ci->last_flush_time = now; + ci->flags = CI_FLAGS_IN_TREE; + + g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); + + RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", + ci->file); + } + assert (ci != NULL); + + while (*buffer_ptr != 0) + { + char **temp; + + value = buffer_ptr; + buffer_ptr += strlen (value) + 1; + + temp = (char **) realloc (ci->values, + sizeof (char *) * (ci->values_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed."); + continue; + } + ci->values = temp; + + ci->values[ci->values_num] = strdup (value); + if (ci->values[ci->values_num] == NULL) + { + RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + continue; + } + ci->values_num++; + + values_num++; + } + + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + { + enqueue_cache_item (ci); + pthread_cond_signal (&cache_cond); + } + + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); + answer[sizeof (answer) - 1] = 0; + + status = write (fd, answer, sizeof (answer)); + if (status < 0) + { + status = errno; + RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error."); + return (status); + } + + return (0); +} /* }}} int handle_request_update */ + +static int handle_request (int fd) /* {{{ */ +{ + char buffer[4096]; + int buffer_size; + + RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd); + + buffer_size = read (fd, buffer, sizeof (buffer)); + if (buffer_size < 1) + { + RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); + return (-1); + } + assert (((size_t) buffer_size) <= sizeof (buffer)); + + if ((buffer[buffer_size - 2] != 0) + || (buffer[buffer_size - 1] != 0)) + { + RRDD_LOG (LOG_INFO, "handle_request: malformed request."); + return (-1); + } + + /* fields in the buffer a separated by null bytes. */ + if (strcmp (buffer, "update") == 0) + { + int offset = strlen ("update") + 1; + return (handle_request_update (fd, buffer + offset, + buffer_size - offset)); + } + else + { + RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer); + return (-1); + } +} /* }}} int handle_request */ + +static void *connection_thread_main (void *args /* {{{ */ + __attribute__((unused))) +{ + pthread_t self; + int i; + int fd; + + fd = *((int *) args); + + RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to " + "connetion_threads[].."); + pthread_mutex_lock (&connetion_threads_lock); + { + pthread_t *temp; + + temp = (pthread_t *) realloc (connetion_threads, + sizeof (pthread_t) * (connetion_threads_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); + } + else + { + connetion_threads = temp; + connetion_threads[connetion_threads_num] = pthread_self (); + connetion_threads_num++; + } + } + pthread_mutex_unlock (&connetion_threads_lock); + RRDD_LOG (LOG_DEBUG, "connection_thread_main: done"); + + while (do_shutdown == 0) + { + struct pollfd pollfd; + int status; + + pollfd.fd = fd; + pollfd.events = POLLIN | POLLPRI; + pollfd.revents = 0; + + status = poll (&pollfd, 1, /* timeout = */ 500); + if (status == 0) /* timeout */ + continue; + else if (status < 0) /* error */ + { + status = errno; + if (status == EINTR) + continue; + RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); + continue; + } + + if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ + { + RRDD_LOG (LOG_DEBUG, "connection_thread_main: " + "poll(2) returned POLLHUP."); + close (fd); + break; + } + else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) + { + RRDD_LOG (LOG_WARNING, "connection_thread_main: " + "poll(2) returned something unexpected: %#04hx", + pollfd.revents); + close (fd); + break; + } + + status = handle_request (fd); + if (status != 0) + { + close (fd); + break; + } + } + + self = pthread_self (); + /* Remove this thread from the connection threads list */ + pthread_mutex_lock (&connetion_threads_lock); + /* Find out own index in the array */ + for (i = 0; i < connetion_threads_num; i++) + if (pthread_equal (connetion_threads[i], self) != 0) + break; + assert (i < connetion_threads_num); + + /* Move the trailing threads forward. */ + if (i < (connetion_threads_num - 1)) + { + memmove (connetion_threads + i, + connetion_threads + i + 1, + sizeof (pthread_t) * (connetion_threads_num - i - 1)); + } + + connetion_threads_num--; + pthread_mutex_unlock (&connetion_threads_lock); + + free (args); + return (NULL); +} /* }}} void *connection_thread_main */ + +static int open_listen_socket_unix (const char *path) /* {{{ */ +{ + int fd; + struct sockaddr_un sa; + listen_socket_t *temp; + int status; + + temp = (listen_socket_t *) realloc (listen_fds, + sizeof (listen_fds[0]) * (listen_fds_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed."); + return (-1); + } + listen_fds = temp; + memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0])); + + fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); + if (fd < 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed."); + return (-1); + } + + memset (&sa, 0, sizeof (sa)); + sa.sun_family = AF_UNIX; + strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1); + + status = bind (fd, (struct sockaddr *) &sa, sizeof (sa)); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed."); + close (fd); + unlink (path); + return (-1); + } + + status = listen (fd, /* backlog = */ 10); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed."); + close (fd); + unlink (path); + return (-1); + } + + listen_fds[listen_fds_num].fd = fd; + snprintf (listen_fds[listen_fds_num].path, + sizeof (listen_fds[listen_fds_num].path) - 1, + "unix:%s", path); + listen_fds_num++; + + return (0); +} /* }}} int open_listen_socket_unix */ + +static int open_listen_socket (const char *addr) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_res; + struct addrinfo *ai_ptr; + int status; + + assert (addr != NULL); + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + return (open_listen_socket_unix (addr + strlen ("unix:"))); + else if (addr[0] == '/') + return (open_listen_socket_unix (addr)); + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_res = NULL; + status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: " + "%s", addr, gai_strerror (status)); + return (-1); + } + + for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + int fd; + listen_socket_t *temp; + + temp = (listen_socket_t *) realloc (listen_fds, + sizeof (listen_fds[0]) * (listen_fds_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed."); + continue; + } + listen_fds = temp; + memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0])); + + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed."); + continue; + } + + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed."); + close (fd); + continue; + } + + status = listen (fd, /* backlog = */ 10); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed."); + close (fd); + return (-1); + } + + listen_fds[listen_fds_num].fd = fd; + strncpy (listen_fds[listen_fds_num].path, addr, + sizeof (listen_fds[listen_fds_num].path) - 1); + listen_fds_num++; + } /* for (ai_ptr) */ + + return (0); +} /* }}} int open_listen_socket */ + +static int close_listen_sockets (void) /* {{{ */ +{ + size_t i; + + for (i = 0; i < listen_fds_num; i++) + { + close (listen_fds[i].fd); + if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0) + unlink (listen_fds[i].path + strlen ("unix:")); + } + + free (listen_fds); + listen_fds = NULL; + listen_fds_num = 0; + + return (0); +} /* }}} int close_listen_sockets */ + +static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + struct pollfd *pollfds; + int pollfds_num; + int status; + int i; + + for (i = 0; i < config_listen_address_list_len; i++) + { + RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] " + "= %s", i, config_listen_address_list[i]); + open_listen_socket (config_listen_address_list[i]); + } + + if (config_listen_address_list_len < 1) + open_listen_socket (RRDD_SOCK_PATH); + + if (listen_fds_num < 1) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets " + "could be opened. Sorry."); + return (NULL); + } + + pollfds_num = listen_fds_num; + pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num); + if (pollfds == NULL) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + return (NULL); + } + memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); + + while (do_shutdown == 0) + { + assert (pollfds_num == ((int) listen_fds_num)); + for (i = 0; i < pollfds_num; i++) + { + pollfds[i].fd = listen_fds[i].fd; + pollfds[i].events = POLLIN | POLLPRI; + pollfds[i].revents = 0; + } + + status = poll (pollfds, pollfds_num, /* timeout = */ -1); + if (status < 1) + { + status = errno; + if (status != EINTR) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed."); + } + continue; + } + + for (i = 0; i < pollfds_num; i++) + { + int *client_sd; + struct sockaddr_storage client_sa; + socklen_t client_sa_size; + pthread_t tid; + + if (pollfds[i].revents == 0) + continue; + + if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: " + "poll(2) returned something unexpected for listen FD #%i.", + pollfds[i].fd); + continue; + } + + client_sd = (int *) malloc (sizeof (int)); + if (client_sd == NULL) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + continue; + } + + client_sa_size = sizeof (client_sa); + *client_sd = accept (pollfds[i].fd, + (struct sockaddr *) &client_sa, &client_sa_size); + if (*client_sd < 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed."); + continue; + } + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.", + *client_sd); + + status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, + /* args = */ (void *) client_sd); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed."); + close (*client_sd); + free (client_sd); + continue; + } + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: " + "tid = %lu", + *((unsigned long *) &tid)); + } /* for (pollfds_num) */ + } /* while (do_shutdown == 0) */ + + close_listen_sockets (); + + pthread_mutex_lock (&connetion_threads_lock); + while (connetion_threads_num > 0) + { + pthread_t wait_for; + + wait_for = connetion_threads[0]; + + pthread_mutex_unlock (&connetion_threads_lock); + pthread_join (wait_for, /* retval = */ NULL); + pthread_mutex_lock (&connetion_threads_lock); + } + pthread_mutex_unlock (&connetion_threads_lock); + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting."); + + return (NULL); +} /* }}} void *listen_thread_main */ + +static int daemonize (void) /* {{{ */ +{ + pid_t child; + int status; + + child = fork (); + if (child < 0) + { + fprintf (stderr, "daemonize: fork(2) failed.\n"); + return (-1); + } + else if (child > 0) + { + return (1); + } + + /* Change into the /tmp directory. */ + chdir ("/tmp"); + + /* Become session leader */ + setsid (); + + /* Open the first three file descriptors to /dev/null */ + close (2); + close (1); + close (0); + + open ("/dev/null", O_RDWR); + dup (0); + dup (0); + + { + struct sigaction sa; + + memset (&sa, 0, sizeof (sa)); + sa.sa_handler = sig_int_handler; + sigaction (SIGINT, &sa, NULL); + + memset (&sa, 0, sizeof (sa)); + sa.sa_handler = sig_term_handler; + sigaction (SIGINT, &sa, NULL); + + memset (&sa, 0, sizeof (sa)); + sa.sa_handler = SIG_IGN; + sigaction (SIGPIPE, &sa, NULL); + } + + openlog ("rrdd", LOG_PID, LOG_DAEMON); + + cache_tree = g_tree_new ((GCompareFunc) strcmp); + if (cache_tree == NULL) + { + RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed."); + return (-1); + } + + memset (&queue_thread, 0, sizeof (queue_thread)); + status = pthread_create (&queue_thread, /* attr = */ NULL, + queue_thread_main, /* args = */ NULL); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); + return (-1); + } + + return (0); +} /* }}} int daemonize */ + +static int cleanup (void) /* {{{ */ +{ + RRDD_LOG (LOG_DEBUG, "cleanup ()"); + + do_shutdown++; + + RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread.."); + pthread_cond_signal (&cache_cond); + pthread_join (queue_thread, /* return = */ NULL); + RRDD_LOG (LOG_DEBUG, "cleanup: done"); + + closelog (); + + return (0); +} /* }}} int cleanup */ + +static int read_options (int argc, char **argv) /* {{{ */ +{ + int option; + int status = 0; + + while ((option = getopt(argc, argv, "l:f:w:h?")) != -1) + { + switch (option) + { + case 'l': + { + char **temp; + + temp = (char **) realloc (config_listen_address_list, + sizeof (char *) * (config_listen_address_list_len + 1)); + if (temp == NULL) + { + fprintf (stderr, "read_options: realloc failed.\n"); + return (2); + } + config_listen_address_list = temp; + + temp[config_listen_address_list_len] = strdup (optarg); + if (temp[config_listen_address_list_len] == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (2); + } + config_listen_address_list_len++; + } + break; + + case 'f': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_flush_interval = temp; + else + { + fprintf (stderr, "Invalid flush interval: %s\n", optarg); + status = 3; + } + } + break; + + case 'w': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_write_interval = temp; + else + { + fprintf (stderr, "Invalid write interval: %s\n", optarg); + status = 2; + } + } + break; + + case 'h': + case '?': + printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" + "\n" + "Usage: rrdd [options]\n" + "\n" + "Valid options are:\n" + " -l
Socket address to listen to.\n" + " -w Interval in which to write data.\n" + " -f Interval in which to flush dead data.\n" + "\n" + "For more information and a detailed description of all options " + "please refer\n" + "to the rrdd(1) manual page.\n", + VERSION); + status = -1; + break; + } /* switch (option) */ + } /* while (getopt) */ + + return (status); +} /* }}} int read_options */ + +int main (int argc, char **argv) +{ + int status; + + status = read_options (argc, argv); + if (status != 0) + { + if (status < 0) + status = 0; + return (status); + } + + status = daemonize (); + if (status == 1) + { + struct sigaction sigchld; + + memset (&sigchld, 0, sizeof (sigchld)); + sigchld.sa_handler = SIG_IGN; + sigaction (SIGCHLD, &sigchld, NULL); + + return (0); + } + else if (status != 0) + { + fprintf (stderr, "daemonize failed, exiting.\n"); + return (1); + } + + listen_thread_main (NULL); + + cleanup (); + + return (0); +} /* int main */ + +/* + * vim: set sw=2 sts=2 ts=8 et fdm=marker : + */