2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
77 #include <sys/types.h>
81 #include <sys/socket.h>
92 #include <glib-2.0/glib.h>
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
98 # define __attribute__(x) /**/
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
115 char addr[PATH_MAX + 1];
117 socket_privilege privilege;
119 /* state for BATCH processing */
131 typedef struct listen_socket_s listen_socket_t;
134 typedef struct cache_item_s cache_item_t;
140 time_t last_flush_time;
141 time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
145 pthread_cond_t flushed;
150 struct callback_flush_data_s
157 typedef struct callback_flush_data_s callback_flush_data_t;
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
168 #define RBUF_SIZE (CMD_MAX*2)
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
188 static GTree *cache_tree = NULL;
189 static cache_item_t *cache_queue_head = NULL;
190 static cache_item_t *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
227 static void sig_common (const char *sig) /* {{{ */
229 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
231 pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246 config_flush_at_shutdown = 1;
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252 config_flush_at_shutdown = 0;
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258 /* These structures are static, because `sigaction' behaves weird if the are
260 static struct sigaction sa_int;
261 static struct sigaction sa_term;
262 static struct sigaction sa_pipe;
263 static struct sigaction sa_usr1;
264 static struct sigaction sa_usr2;
266 /* Install signal handlers */
267 memset (&sa_int, 0, sizeof (sa_int));
268 sa_int.sa_handler = sig_int_handler;
269 sigaction (SIGINT, &sa_int, NULL);
271 memset (&sa_term, 0, sizeof (sa_term));
272 sa_term.sa_handler = sig_term_handler;
273 sigaction (SIGTERM, &sa_term, NULL);
275 memset (&sa_pipe, 0, sizeof (sa_pipe));
276 sa_pipe.sa_handler = SIG_IGN;
277 sigaction (SIGPIPE, &sa_pipe, NULL);
279 memset (&sa_pipe, 0, sizeof (sa_usr1));
280 sa_usr1.sa_handler = sig_usr1_handler;
281 sigaction (SIGUSR1, &sa_usr1, NULL);
283 memset (&sa_usr2, 0, sizeof (sa_usr2));
284 sa_usr2.sa_handler = sig_usr2_handler;
285 sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
294 file = (config_pid_file != NULL)
296 : LOCALSTATEDIR "/run/rrdcached.pid";
298 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
300 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301 file, rrd_strerror(errno));
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
313 fh = fdopen (fd, "w");
316 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
321 fprintf (fh, "%i\n", (int) pid);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
332 file = (config_pid_file != NULL)
334 : LOCALSTATEDIR "/run/rrdcached.pid";
336 status = unlink (file);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
346 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347 sock->next_read - sock->next_cmd);
351 /* no commands left, move remainder back to front of rbuf */
352 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353 sock->next_read - sock->next_cmd);
354 sock->next_read -= sock->next_cmd;
361 char *cmd = sock->rbuf + sock->next_cmd;
364 sock->next_cmd = eol - sock->rbuf + 1;
366 if (eol > sock->rbuf && *(eol-1) == '\r')
367 *(--eol) = '\0'; /* handle "\r\n" EOL */
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
383 assert(sock != NULL);
385 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
388 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
392 strncpy(new_buf + sock->wbuf_len, str, len + 1);
394 sock->wbuf = new_buf;
395 sock->wbuf_len += len;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
404 char buffer[CMD_MAX];
407 if (sock == NULL) return 0; /* journal replay mode */
408 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
411 #ifdef HAVE_VSNPRINTF
412 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
414 len = vsprintf(buffer, fmt, argp);
419 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
423 return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
432 while ((str = strchr(str, '\n')) != NULL)
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443 * returns 0 on success, -1 on error
444 * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446 char *fmt, ...) /* {{{ */
449 char buffer[CMD_MAX];
454 if (sock == NULL) return rc; /* journal replay mode */
456 if (sock->batch_start)
459 return rc; /* no response on success during BATCH */
460 lines = sock->batch_cmd;
462 else if (rc == RESP_OK)
463 lines = count_lines(sock->wbuf);
467 rclen = sprintf(buffer, "%d ", lines);
469 #ifdef HAVE_VSNPRINTF
470 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
472 len = vsprintf(buffer+rclen, fmt, argp);
480 /* append the result to the wbuf, don't write to the user */
481 if (sock->batch_start)
482 return add_to_wbuf(sock, buffer, len);
484 /* first write must be complete */
485 if (len != write(sock->fd, buffer, len))
487 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
491 if (sock->wbuf != NULL && rc == RESP_OK)
494 while (wrote < sock->wbuf_len)
496 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
499 RRDD_LOG(LOG_INFO, "send_response: could not write results");
506 free(sock->wbuf); sock->wbuf = NULL;
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
517 ci->last_flush_time = when;
518 if (config_write_jitter > 0)
519 ci->last_flush_time += (random() % config_write_jitter);
523 * remove a "cache_item_t" item from the queue.
524 * must hold 'cache_lock' when calling this
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
528 if (ci == NULL) return;
530 if (ci->prev == NULL)
531 cache_queue_head = ci->next; /* reset head */
533 ci->prev->next = ci->next;
535 if (ci->next == NULL)
536 cache_queue_tail = ci->prev; /* reset the tail */
538 ci->next->prev = ci->prev;
540 ci->next = ci->prev = NULL;
541 ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545 * must hold 'cache lock' while calling this.
546 * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
551 ci = g_tree_lookup(cache_tree, file);
555 g_tree_remove (cache_tree, file);
556 remove_from_queue(ci);
558 for (int i=0; i < ci->values_num; i++)
564 /* in case anyone is waiting */
565 pthread_cond_broadcast(&ci->flushed);
570 } /* }}} static int forget_file */
573 * enqueue_cache_item:
574 * `cache_lock' must be acquired before calling this function!
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
582 if (ci->values_num == 0)
587 if (cache_queue_head == ci)
590 /* remove from the double linked list */
591 if (ci->flags & CI_FLAGS_IN_QUEUE)
592 remove_from_queue(ci);
595 ci->next = cache_queue_head;
596 if (ci->next != NULL)
598 cache_queue_head = ci;
600 if (cache_queue_tail == NULL)
601 cache_queue_tail = cache_queue_head;
603 else /* (side == TAIL) */
605 /* We don't move values back in the list.. */
606 if (ci->flags & CI_FLAGS_IN_QUEUE)
609 assert (ci->next == NULL);
610 assert (ci->prev == NULL);
612 ci->prev = cache_queue_tail;
614 if (cache_queue_tail == NULL)
615 cache_queue_head = ci;
617 cache_queue_tail->next = ci;
619 cache_queue_tail = ci;
622 ci->flags |= CI_FLAGS_IN_QUEUE;
624 pthread_cond_broadcast(&cache_cond);
625 pthread_mutex_lock (&stats_lock);
626 stats_queue_length++;
627 pthread_mutex_unlock (&stats_lock);
630 } /* }}} int enqueue_cache_item */
633 * tree_callback_flush:
634 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635 * while this is in progress.
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
641 callback_flush_data_t *cfd;
643 ci = (cache_item_t *) value;
644 cfd = (callback_flush_data_t *) data;
646 if ((ci->last_flush_time <= cfd->abs_timeout)
647 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648 && (ci->values_num > 0))
650 enqueue_cache_item (ci, TAIL);
652 else if ((do_shutdown != 0)
653 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654 && (ci->values_num > 0))
656 enqueue_cache_item (ci, TAIL);
658 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660 && (ci->values_num <= 0))
664 temp = (char **) realloc (cfd->keys,
665 sizeof (char *) * (cfd->keys_num + 1));
668 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
672 /* Make really sure this points to the _same_ place */
673 assert ((char *) key == ci->file);
674 cfd->keys[cfd->keys_num] = (char *) key;
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
683 callback_flush_data_t cfd;
686 memset (&cfd, 0, sizeof (cfd));
687 /* Pass the current time as user data so that we don't need to call
688 * `time' for each node. */
689 cfd.now = time (NULL);
694 cfd.abs_timeout = cfd.now - max_age;
696 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698 /* `tree_callback_flush' will return the keys of all values that haven't
699 * been touched in the last `config_flush_interval' seconds in `cfd'.
700 * The char*'s in this array point to the same memory as ci->file, so we
701 * don't need to free them separately. */
702 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704 for (k = 0; k < cfd.keys_num; k++)
706 /* should never fail, since we have held the cache_lock
708 assert( forget_file(cfd.keys[k]) == 0 );
711 if (cfd.keys != NULL)
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
723 struct timespec next_flush;
724 int final_flush = 0; /* make sure we only flush once on shutdown */
726 gettimeofday (&now, NULL);
727 next_flush.tv_sec = now.tv_sec + config_flush_interval;
728 next_flush.tv_nsec = 1000 * now.tv_usec;
730 pthread_mutex_lock (&cache_lock);
731 while ((do_shutdown == 0) || (cache_queue_head != NULL))
740 /* First, check if it's time to do the cache flush. */
741 gettimeofday (&now, NULL);
742 if ((now.tv_sec > next_flush.tv_sec)
743 || ((now.tv_sec == next_flush.tv_sec)
744 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
746 /* Flush all values that haven't been written in the last
747 * `config_write_interval' seconds. */
748 flush_old_values (config_write_interval);
750 /* Determine the time of the next cache flush. */
751 while (next_flush.tv_sec <= now.tv_sec)
752 next_flush.tv_sec += config_flush_interval;
754 /* unlock the cache while we rotate so we don't block incoming
755 * updates if the fsync() blocks on disk I/O */
756 pthread_mutex_unlock(&cache_lock);
758 pthread_mutex_lock(&cache_lock);
761 /* Now, check if there's something to store away. If not, wait until
762 * something comes in or it's time to do the cache flush. if we are
763 * shutting down, do not wait around. */
764 if (cache_queue_head == NULL && !do_shutdown)
766 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767 if ((status != 0) && (status != ETIMEDOUT))
769 RRDD_LOG (LOG_ERR, "queue_thread_main: "
770 "pthread_cond_timedwait returned %i.", status);
774 /* We're about to shut down */
775 if (do_shutdown != 0 && !final_flush++)
777 if (config_flush_at_shutdown)
778 flush_old_values (-1); /* flush everything */
783 /* Check if a value has arrived. This may be NULL if we timed out or there
784 * was an interrupt such as a signal. */
785 if (cache_queue_head == NULL)
788 ci = cache_queue_head;
790 /* copy the relevant parts */
791 file = strdup (ci->file);
794 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
798 assert(ci->values != NULL);
799 assert(ci->values_num > 0);
802 values_num = ci->values_num;
804 wipe_ci_values(ci, time(NULL));
805 remove_from_queue(ci);
807 pthread_mutex_lock (&stats_lock);
808 assert (stats_queue_length > 0);
809 stats_queue_length--;
810 pthread_mutex_unlock (&stats_lock);
812 pthread_mutex_unlock (&cache_lock);
815 status = rrd_update_r (file, NULL, values_num, (void *) values);
818 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819 "rrd_update_r (%s) failed with status %i. (%s)",
820 file, status, rrd_get_error());
823 journal_write("wrote", file);
824 pthread_cond_broadcast(&ci->flushed);
826 for (i = 0; i < values_num; i++)
834 pthread_mutex_lock (&stats_lock);
835 stats_updates_written++;
836 stats_data_sets_written += values_num;
837 pthread_mutex_unlock (&stats_lock);
840 pthread_mutex_lock (&cache_lock);
842 /* We're about to shut down */
843 if (do_shutdown != 0 && !final_flush++)
845 if (config_flush_at_shutdown)
846 flush_old_values (-1); /* flush everything */
850 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851 pthread_mutex_unlock (&cache_lock);
853 if (config_flush_at_shutdown)
855 assert(cache_queue_head == NULL);
856 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865 size_t *buffer_size_ret, char **field_ret)
874 buffer = *buffer_ret;
876 buffer_size = *buffer_size_ret;
880 if (buffer_size <= 0)
883 /* This is ensured by `handle_request'. */
884 assert (buffer[buffer_size - 1] == '\0');
887 while (buffer_pos < buffer_size)
889 /* Check for end-of-field or end-of-buffer */
890 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
892 field[field_size] = 0;
898 /* Handle escaped characters. */
899 else if (buffer[buffer_pos] == '\\')
901 if (buffer_pos >= (buffer_size - 1))
904 field[field_size] = buffer[buffer_pos];
908 /* Normal operation */
911 field[field_size] = buffer[buffer_pos];
915 } /* while (buffer_pos < buffer_size) */
920 *buffer_ret = buffer + buffer_pos;
921 *buffer_size_ret = buffer_size - buffer_pos;
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928 * check whether the file falls within the dir
929 * returns 1 if OK, otherwise 0
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
933 assert(file != NULL);
935 if (!config_write_base_only
936 || sock == NULL /* journal replay */
937 || config_base_dir == NULL)
940 if (strstr(file, "../") != NULL) goto err;
942 /* relative paths without "../" are ok */
943 if (*file != '/') return 1;
945 /* file must be of the format base + "/" + <1+ char filename> */
946 if (strlen(file) < _config_base_dir_len + 2) goto err;
947 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948 if (*(file + _config_base_dir_len) != '/') goto err;
953 if (sock != NULL && sock->fd >= 0)
954 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
957 } /* }}} static int check_file_access */
959 /* returns 1 if we have the required privilege level,
960 * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962 socket_privilege priv)
964 if (sock == NULL) /* journal replay */
967 if (sock->privilege >= priv)
970 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
973 static int flush_file (const char *filename) /* {{{ */
977 pthread_mutex_lock (&cache_lock);
979 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
982 pthread_mutex_unlock (&cache_lock);
986 if (ci->values_num > 0)
988 /* Enqueue at head */
989 enqueue_cache_item (ci, HEAD);
990 pthread_cond_wait(&ci->flushed, &cache_lock);
993 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
994 * may have been purged during our cond_wait() */
996 pthread_mutex_unlock(&cache_lock);
999 } /* }}} int flush_file */
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002 char *buffer, size_t buffer_size)
1008 char *help_help[2] =
1010 "Command overview\n"
1012 "HELP [<command>]\n"
1013 "FLUSH <filename>\n"
1015 "PENDING <filename>\n"
1016 "FORGET <filename>\n"
1017 "UPDATE <filename> <values> [<values> ...]\n"
1022 char *help_flush[2] =
1026 "Usage: FLUSH <filename>\n"
1028 "Adds the given filename to the head of the update queue and returns\n"
1029 "after is has been dequeued.\n"
1032 char *help_flushall[2] =
1034 "Help for FLUSHALL\n"
1038 "Triggers writing of all pending updates. Returns immediately.\n"
1041 char *help_pending[2] =
1043 "Help for PENDING\n"
1045 "Usage: PENDING <filename>\n"
1047 "Shows any 'pending' updates for a file, in order.\n"
1048 "The updates shown have not yet been written to the underlying RRD file.\n"
1051 char *help_forget[2] =
1055 "Usage: FORGET <filename>\n"
1057 "Removes the file completely from the cache.\n"
1058 "Any pending updates for the file will be lost.\n"
1061 char *help_update[2] =
1065 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1067 "Adds the given file to the internal cache if it is not yet known and\n"
1068 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1071 "Each <values> has the following form:\n"
1072 " <values> = <time>:<value>[:<value>[...]]\n"
1073 "See the rrdupdate(1) manpage for details.\n"
1076 char *help_stats[2] =
1082 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083 "a description of the values.\n"
1086 char *help_batch[2] =
1090 "The 'BATCH' command permits the client to initiate a bulk load\n"
1091 " of commands to rrdcached.\n"
1096 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1097 " client: command #1\n"
1098 " client: command #2\n"
1099 " client: ... and so on\n"
1101 " server: 2 errors\n"
1102 " server: 7 message for command #7\n"
1103 " server: 9 message for command #9\n"
1105 "For more information, consult the rrdcached(1) documentation.\n"
1108 status = buffer_get_field (&buffer, &buffer_size, &command);
1110 help_text = help_help;
1113 if (strcasecmp (command, "update") == 0)
1114 help_text = help_update;
1115 else if (strcasecmp (command, "flush") == 0)
1116 help_text = help_flush;
1117 else if (strcasecmp (command, "flushall") == 0)
1118 help_text = help_flushall;
1119 else if (strcasecmp (command, "pending") == 0)
1120 help_text = help_pending;
1121 else if (strcasecmp (command, "forget") == 0)
1122 help_text = help_forget;
1123 else if (strcasecmp (command, "stats") == 0)
1124 help_text = help_stats;
1125 else if (strcasecmp (command, "batch") == 0)
1126 help_text = help_batch;
1128 help_text = help_help;
1131 add_response_info(sock, help_text[1]);
1132 return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1137 uint64_t copy_queue_length;
1138 uint64_t copy_updates_received;
1139 uint64_t copy_flush_received;
1140 uint64_t copy_updates_written;
1141 uint64_t copy_data_sets_written;
1142 uint64_t copy_journal_bytes;
1143 uint64_t copy_journal_rotate;
1145 uint64_t tree_nodes_number;
1146 uint64_t tree_depth;
1148 pthread_mutex_lock (&stats_lock);
1149 copy_queue_length = stats_queue_length;
1150 copy_updates_received = stats_updates_received;
1151 copy_flush_received = stats_flush_received;
1152 copy_updates_written = stats_updates_written;
1153 copy_data_sets_written = stats_data_sets_written;
1154 copy_journal_bytes = stats_journal_bytes;
1155 copy_journal_rotate = stats_journal_rotate;
1156 pthread_mutex_unlock (&stats_lock);
1158 pthread_mutex_lock (&cache_lock);
1159 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160 tree_depth = (uint64_t) g_tree_height (cache_tree);
1161 pthread_mutex_unlock (&cache_lock);
1163 add_response_info(sock,
1164 "QueueLength: %"PRIu64"\n", copy_queue_length);
1165 add_response_info(sock,
1166 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167 add_response_info(sock,
1168 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169 add_response_info(sock,
1170 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171 add_response_info(sock,
1172 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178 send_response(sock, RESP_OK, "Statistics follow\n");
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184 char *buffer, size_t buffer_size)
1189 status = buffer_get_field (&buffer, &buffer_size, &file);
1192 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1196 pthread_mutex_lock(&stats_lock);
1197 stats_flush_received++;
1198 pthread_mutex_unlock(&stats_lock);
1200 if (!check_file_access(file, sock)) return 0;
1202 status = flush_file (file);
1204 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205 else if (status == ENOENT)
1207 /* no file in our tree; see whether it exists at all */
1208 struct stat statbuf;
1210 memset(&statbuf, 0, sizeof(statbuf));
1211 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1214 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1216 else if (status < 0)
1217 return send_response(sock, RESP_ERR, "Internal error.\n");
1219 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1230 status = has_privilege(sock, PRIV_HIGH);
1234 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236 pthread_mutex_lock(&cache_lock);
1237 flush_old_values(-1);
1238 pthread_mutex_unlock(&cache_lock);
1240 return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244 char *buffer, size_t buffer_size)
1250 status = buffer_get_field(&buffer, &buffer_size, &file);
1252 return send_response(sock, RESP_ERR,
1253 "Usage: PENDING <filename>\n");
1255 status = has_privilege(sock, PRIV_HIGH);
1259 pthread_mutex_lock(&cache_lock);
1260 ci = g_tree_lookup(cache_tree, file);
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267 for (int i=0; i < ci->values_num; i++)
1268 add_response_info(sock, "%s\n", ci->values[i]);
1270 pthread_mutex_unlock(&cache_lock);
1271 return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275 char *buffer, size_t buffer_size)
1280 status = buffer_get_field(&buffer, &buffer_size, &file);
1282 return send_response(sock, RESP_ERR,
1283 "Usage: FORGET <filename>\n");
1285 status = has_privilege(sock, PRIV_HIGH);
1289 if (!check_file_access(file, sock)) return 0;
1291 pthread_mutex_lock(&cache_lock);
1292 status = forget_file(file);
1293 pthread_mutex_unlock(&cache_lock);
1298 journal_write("forget", file);
1300 return send_response(sock, RESP_OK, "Gone!\n");
1303 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304 status < 0 ? "Internal error" : rrd_strerror(status));
1308 } /* }}} static int handle_request_forget */
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1312 char *buffer, size_t buffer_size)
1316 int bad_timestamps = 0;
1318 char orig_buf[CMD_MAX];
1322 status = has_privilege(sock, PRIV_HIGH);
1326 /* save it for the journal later */
1327 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1331 return send_response(sock, RESP_ERR,
1332 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1334 pthread_mutex_lock(&stats_lock);
1335 stats_updates_received++;
1336 pthread_mutex_unlock(&stats_lock);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1345 struct stat statbuf;
1347 /* don't hold the lock while we setup; stat(2) might block */
1348 pthread_mutex_unlock(&cache_lock);
1350 memset (&statbuf, 0, sizeof (statbuf));
1351 status = stat (file, &statbuf);
1354 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357 if (status == ENOENT)
1358 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360 return send_response(sock, RESP_ERR,
1361 "stat failed with error %i.\n", status);
1363 if (!S_ISREG (statbuf.st_mode))
1364 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366 if (access(file, R_OK|W_OK) != 0)
1367 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368 file, rrd_strerror(errno));
1370 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375 return send_response(sock, RESP_ERR, "malloc failed.\n");
1377 memset (ci, 0, sizeof (cache_item_t));
1379 ci->file = strdup (file);
1380 if (ci->file == NULL)
1383 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385 return send_response(sock, RESP_ERR, "strdup failed.\n");
1388 wipe_ci_values(ci, now);
1389 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_mutex_lock(&cache_lock);
1392 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1394 assert (ci != NULL);
1396 /* don't re-write updates in replay mode */
1398 journal_write("update", orig_buf);
1400 while (buffer_size > 0)
1407 status = buffer_get_field (&buffer, &buffer_size, &value);
1410 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1414 /* make sure update time is always moving forward */
1415 stamp = strtol(value, &eostamp, 10);
1416 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1422 else if (stamp <= ci->last_update_stamp)
1425 add_response_info(sock,
1426 "illegal attempt to update using time %ld when"
1427 " last update time is %ld (minimum one second step)\n",
1428 stamp, ci->last_update_stamp);
1432 ci->last_update_stamp = stamp;
1434 temp = (char **) realloc (ci->values,
1435 sizeof (char *) * (ci->values_num + 1));
1438 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1443 ci->values[ci->values_num] = strdup (value);
1444 if (ci->values[ci->values_num] == NULL)
1446 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1454 if (((now - ci->last_flush_time) >= config_write_interval)
1455 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1456 && (ci->values_num > 0))
1458 enqueue_cache_item (ci, TAIL);
1461 pthread_mutex_unlock (&cache_lock);
1465 /* if we had only one update attempt, then return the full
1466 error message... try to get the most information out
1467 of the limited error space allowed by the protocol
1469 if (bad_timestamps == 1)
1470 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1472 return send_response(sock, RESP_ERR,
1473 "No values updated (%d bad timestamps).\n",
1477 return send_response(sock, RESP_OK,
1478 "errors, enqueued %i value(s).\n", values_num);
1483 } /* }}} int handle_request_update */
1485 /* we came across a "WROTE" entry during journal replay.
1486 * throw away any values that we have accumulated for this file
1488 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1492 const char *file = buffer;
1494 pthread_mutex_lock(&cache_lock);
1496 ci = g_tree_lookup(cache_tree, file);
1499 pthread_mutex_unlock(&cache_lock);
1505 for (i=0; i < ci->values_num; i++)
1506 free(ci->values[i]);
1511 wipe_ci_values(ci, now);
1512 remove_from_queue(ci);
1514 pthread_mutex_unlock(&cache_lock);
1516 } /* }}} int handle_request_wrote */
1518 /* start "BATCH" processing */
1519 static int batch_start (listen_socket_t *sock) /* {{{ */
1522 if (sock->batch_start)
1523 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1525 status = send_response(sock, RESP_OK,
1526 "Go ahead. End with dot '.' on its own line.\n");
1527 sock->batch_start = time(NULL);
1528 sock->batch_cmd = 0;
1531 } /* }}} static int batch_start */
1533 /* finish "BATCH" processing and return results to the client */
1534 static int batch_done (listen_socket_t *sock) /* {{{ */
1536 assert(sock->batch_start);
1537 sock->batch_start = 0;
1538 sock->batch_cmd = 0;
1539 return send_response(sock, RESP_OK, "errors\n");
1540 } /* }}} static int batch_done */
1542 /* if sock==NULL, we are in journal replay mode */
1543 static int handle_request (listen_socket_t *sock, /* {{{ */
1545 char *buffer, size_t buffer_size)
1551 assert (buffer[buffer_size - 1] == '\0');
1553 buffer_ptr = buffer;
1555 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1558 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1562 if (sock != NULL && sock->batch_start)
1565 if (strcasecmp (command, "update") == 0)
1566 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1567 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1569 /* this is only valid in replay mode */
1570 return (handle_request_wrote (buffer_ptr, now));
1572 else if (strcasecmp (command, "flush") == 0)
1573 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1574 else if (strcasecmp (command, "flushall") == 0)
1575 return (handle_request_flushall(sock));
1576 else if (strcasecmp (command, "pending") == 0)
1577 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1578 else if (strcasecmp (command, "forget") == 0)
1579 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1580 else if (strcasecmp (command, "stats") == 0)
1581 return (handle_request_stats (sock));
1582 else if (strcasecmp (command, "help") == 0)
1583 return (handle_request_help (sock, buffer_ptr, buffer_size));
1584 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1585 return batch_start(sock);
1586 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1587 return batch_done(sock);
1589 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1593 } /* }}} int handle_request */
1595 /* MUST NOT hold journal_lock before calling this */
1596 static void journal_rotate(void) /* {{{ */
1598 FILE *old_fh = NULL;
1601 if (journal_cur == NULL || journal_old == NULL)
1604 pthread_mutex_lock(&journal_lock);
1606 /* we rotate this way (rename before close) so that the we can release
1607 * the journal lock as fast as possible. Journal writes to the new
1608 * journal can proceed immediately after the new file is opened. The
1609 * fclose can then block without affecting new updates.
1611 if (journal_fh != NULL)
1613 old_fh = journal_fh;
1615 rename(journal_cur, journal_old);
1616 ++stats_journal_rotate;
1619 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1620 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1623 journal_fh = fdopen(new_fd, "a");
1624 if (journal_fh == NULL)
1628 pthread_mutex_unlock(&journal_lock);
1633 if (journal_fh == NULL)
1636 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1637 journal_cur, rrd_strerror(errno));
1640 "JOURNALING DISABLED: All values will be flushed at shutdown");
1641 config_flush_at_shutdown = 1;
1644 } /* }}} static void journal_rotate */
1646 static void journal_done(void) /* {{{ */
1648 if (journal_cur == NULL)
1651 pthread_mutex_lock(&journal_lock);
1652 if (journal_fh != NULL)
1658 if (config_flush_at_shutdown)
1660 RRDD_LOG(LOG_INFO, "removing journals");
1661 unlink(journal_old);
1662 unlink(journal_cur);
1666 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1667 "journals will be used at next startup");
1670 pthread_mutex_unlock(&journal_lock);
1672 } /* }}} static void journal_done */
1674 static int journal_write(char *cmd, char *args) /* {{{ */
1678 if (journal_fh == NULL)
1681 pthread_mutex_lock(&journal_lock);
1682 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1683 pthread_mutex_unlock(&journal_lock);
1687 pthread_mutex_lock(&stats_lock);
1688 stats_journal_bytes += chars;
1689 pthread_mutex_unlock(&stats_lock);
1693 } /* }}} static int journal_write */
1695 static int journal_replay (const char *file) /* {{{ */
1701 char entry[CMD_MAX];
1704 if (file == NULL) return 0;
1709 struct stat statbuf;
1711 memset(&statbuf, 0, sizeof(statbuf));
1712 if (stat(file, &statbuf) != 0)
1714 if (errno == ENOENT)
1717 reason = "stat error";
1720 else if (!S_ISREG(statbuf.st_mode))
1722 reason = "not a regular file";
1725 if (statbuf.st_uid != daemon_uid)
1727 reason = "not owned by daemon user";
1730 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1732 reason = "must not be user/group writable";
1738 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739 file, rrd_strerror(status), reason);
1744 fh = fopen(file, "r");
1747 if (errno != ENOENT)
1748 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749 file, rrd_strerror(errno));
1753 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1762 if (fgets(entry, sizeof(entry), fh) == NULL)
1764 entry_len = strlen(entry);
1766 /* check \n termination in case journal writing crashed mid-line */
1769 else if (entry[entry_len - 1] != '\n')
1771 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1776 entry[entry_len - 1] = '\0';
1778 if (handle_request(NULL, now, entry, entry_len) == 0)
1786 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1787 entry_cnt, fail_cnt);
1789 return entry_cnt > 0 ? 1 : 0;
1790 } /* }}} static int journal_replay */
1792 static void journal_init(void) /* {{{ */
1794 int had_journal = 0;
1796 if (journal_cur == NULL) return;
1798 pthread_mutex_lock(&journal_lock);
1800 RRDD_LOG(LOG_INFO, "checking for journal files");
1802 had_journal += journal_replay(journal_old);
1803 had_journal += journal_replay(journal_cur);
1805 /* it must have been a crash. start a flush */
1806 if (had_journal && config_flush_at_shutdown)
1807 flush_old_values(-1);
1809 pthread_mutex_unlock(&journal_lock);
1812 RRDD_LOG(LOG_INFO, "journal processing complete");
1814 } /* }}} static void journal_init */
1816 static void close_connection(listen_socket_t *sock)
1818 close(sock->fd) ; sock->fd = -1;
1819 free(sock->rbuf); sock->rbuf = NULL;
1820 free(sock->wbuf); sock->wbuf = NULL;
1825 static void *connection_thread_main (void *args) /* {{{ */
1828 listen_socket_t *sock;
1832 sock = (listen_socket_t *) args;
1835 /* init read buffers */
1836 sock->next_read = sock->next_cmd = 0;
1837 sock->rbuf = malloc(RBUF_SIZE);
1838 if (sock->rbuf == NULL)
1840 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1841 close_connection(sock);
1845 pthread_mutex_lock (&connection_threads_lock);
1849 temp = (pthread_t *) realloc (connection_threads,
1850 sizeof (pthread_t) * (connection_threads_num + 1));
1853 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1857 connection_threads = temp;
1858 connection_threads[connection_threads_num] = pthread_self ();
1859 connection_threads_num++;
1862 pthread_mutex_unlock (&connection_threads_lock);
1864 while (do_shutdown == 0)
1871 struct pollfd pollfd;
1875 pollfd.events = POLLIN | POLLPRI;
1878 status = poll (&pollfd, 1, /* timeout = */ 500);
1881 else if (status == 0) /* timeout */
1883 else if (status < 0) /* error */
1886 if (status != EINTR)
1887 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1891 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1893 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1895 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1896 "poll(2) returned something unexpected: %#04hx",
1901 rbytes = read(fd, sock->rbuf + sock->next_read,
1902 RBUF_SIZE - sock->next_read);
1905 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1908 else if (rbytes == 0)
1911 sock->next_read += rbytes;
1913 if (sock->batch_start)
1914 now = sock->batch_start;
1918 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1920 status = handle_request (sock, now, cmd, cmd_len+1);
1927 close_connection(sock);
1929 self = pthread_self ();
1930 /* Remove this thread from the connection threads list */
1931 pthread_mutex_lock (&connection_threads_lock);
1932 /* Find out own index in the array */
1933 for (i = 0; i < connection_threads_num; i++)
1934 if (pthread_equal (connection_threads[i], self) != 0)
1936 assert (i < connection_threads_num);
1938 /* Move the trailing threads forward. */
1939 if (i < (connection_threads_num - 1))
1941 memmove (connection_threads + i,
1942 connection_threads + i + 1,
1943 sizeof (pthread_t) * (connection_threads_num - i - 1));
1946 connection_threads_num--;
1947 pthread_mutex_unlock (&connection_threads_lock);
1950 } /* }}} void *connection_thread_main */
1952 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1955 struct sockaddr_un sa;
1956 listen_socket_t *temp;
1961 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1962 path += strlen("unix:");
1964 temp = (listen_socket_t *) realloc (listen_fds,
1965 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1968 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1972 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1974 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1977 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1981 memset (&sa, 0, sizeof (sa));
1982 sa.sun_family = AF_UNIX;
1983 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1985 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1988 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1994 status = listen (fd, /* backlog = */ 10);
1997 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
2003 listen_fds[listen_fds_num].fd = fd;
2004 listen_fds[listen_fds_num].family = PF_UNIX;
2005 strncpy(listen_fds[listen_fds_num].addr, path,
2006 sizeof (listen_fds[listen_fds_num].addr) - 1);
2010 } /* }}} int open_listen_socket_unix */
2012 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2014 struct addrinfo ai_hints;
2015 struct addrinfo *ai_res;
2016 struct addrinfo *ai_ptr;
2017 char addr_copy[NI_MAXHOST];
2022 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2023 addr_copy[sizeof (addr_copy) - 1] = 0;
2026 memset (&ai_hints, 0, sizeof (ai_hints));
2027 ai_hints.ai_flags = 0;
2028 #ifdef AI_ADDRCONFIG
2029 ai_hints.ai_flags |= AI_ADDRCONFIG;
2031 ai_hints.ai_family = AF_UNSPEC;
2032 ai_hints.ai_socktype = SOCK_STREAM;
2035 if (*addr == '[') /* IPv6+port format */
2037 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2040 port = strchr (addr, ']');
2043 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2052 else if (*port == 0)
2056 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2060 } /* if (*addr = ']') */
2061 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2063 port = rindex(addr, ':');
2071 status = getaddrinfo (addr,
2072 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2073 &ai_hints, &ai_res);
2076 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2077 "%s", addr, gai_strerror (status));
2081 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2084 listen_socket_t *temp;
2087 temp = (listen_socket_t *) realloc (listen_fds,
2088 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2091 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2095 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2097 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2100 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2104 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2106 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2109 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2114 status = listen (fd, /* backlog = */ 10);
2117 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2122 listen_fds[listen_fds_num].fd = fd;
2123 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2125 } /* for (ai_ptr) */
2128 } /* }}} static int open_listen_socket_network */
2130 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2132 assert(sock != NULL);
2133 assert(sock->addr != NULL);
2135 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2136 || sock->addr[0] == '/')
2137 return (open_listen_socket_unix(sock));
2139 return (open_listen_socket_network(sock));
2140 } /* }}} int open_listen_socket */
2142 static int close_listen_sockets (void) /* {{{ */
2146 for (i = 0; i < listen_fds_num; i++)
2148 close (listen_fds[i].fd);
2150 if (listen_fds[i].family == PF_UNIX)
2151 unlink(listen_fds[i].addr);
2159 } /* }}} int close_listen_sockets */
2161 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2163 struct pollfd *pollfds;
2168 for (i = 0; i < config_listen_address_list_len; i++)
2169 open_listen_socket (config_listen_address_list[i]);
2171 if (config_listen_address_list_len < 1)
2173 listen_socket_t sock;
2174 memset(&sock, 0, sizeof(sock));
2175 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2176 open_listen_socket (&sock);
2179 if (listen_fds_num < 1)
2181 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2182 "could be opened. Sorry.");
2186 pollfds_num = listen_fds_num;
2187 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2188 if (pollfds == NULL)
2190 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2193 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2195 RRDD_LOG(LOG_INFO, "listening for connections");
2197 while (do_shutdown == 0)
2199 assert (pollfds_num == ((int) listen_fds_num));
2200 for (i = 0; i < pollfds_num; i++)
2202 pollfds[i].fd = listen_fds[i].fd;
2203 pollfds[i].events = POLLIN | POLLPRI;
2204 pollfds[i].revents = 0;
2207 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2210 else if (status == 0) /* timeout */
2212 else if (status < 0) /* error */
2215 if (status != EINTR)
2217 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2222 for (i = 0; i < pollfds_num; i++)
2224 listen_socket_t *client_sock;
2225 struct sockaddr_storage client_sa;
2226 socklen_t client_sa_size;
2228 pthread_attr_t attr;
2230 if (pollfds[i].revents == 0)
2233 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2235 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2236 "poll(2) returned something unexpected for listen FD #%i.",
2241 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2242 if (client_sock == NULL)
2244 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2247 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2249 client_sa_size = sizeof (client_sa);
2250 client_sock->fd = accept (pollfds[i].fd,
2251 (struct sockaddr *) &client_sa, &client_sa_size);
2252 if (client_sock->fd < 0)
2254 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2259 pthread_attr_init (&attr);
2260 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2262 status = pthread_create (&tid, &attr, connection_thread_main,
2266 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2267 close_connection(client_sock);
2270 } /* for (pollfds_num) */
2271 } /* while (do_shutdown == 0) */
2273 RRDD_LOG(LOG_INFO, "starting shutdown");
2275 close_listen_sockets ();
2277 pthread_mutex_lock (&connection_threads_lock);
2278 while (connection_threads_num > 0)
2282 wait_for = connection_threads[0];
2284 pthread_mutex_unlock (&connection_threads_lock);
2285 pthread_join (wait_for, /* retval = */ NULL);
2286 pthread_mutex_lock (&connection_threads_lock);
2288 pthread_mutex_unlock (&connection_threads_lock);
2291 } /* }}} void *listen_thread_main */
2293 static int daemonize (void) /* {{{ */
2299 daemon_uid = geteuid();
2301 fd = open_pidfile();
2302 if (fd < 0) return fd;
2304 if (!stay_foreground)
2311 fprintf (stderr, "daemonize: fork(2) failed.\n");
2319 /* Become session leader */
2322 /* Open the first three file descriptors to /dev/null */
2327 open ("/dev/null", O_RDWR);
2330 } /* if (!stay_foreground) */
2332 /* Change into the /tmp directory. */
2333 base_dir = (config_base_dir != NULL)
2336 status = chdir (base_dir);
2339 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2343 install_signal_handlers();
2345 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2346 RRDD_LOG(LOG_INFO, "starting up");
2348 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2349 if (cache_tree == NULL)
2351 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2355 status = write_pidfile (fd);
2357 } /* }}} int daemonize */
2359 static int cleanup (void) /* {{{ */
2363 pthread_cond_signal (&cache_cond);
2364 pthread_join (queue_thread, /* return = */ NULL);
2368 RRDD_LOG(LOG_INFO, "goodbye");
2372 } /* }}} int cleanup */
2374 static int read_options (int argc, char **argv) /* {{{ */
2379 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2390 listen_socket_t **temp;
2391 listen_socket_t *new;
2393 new = malloc(sizeof(listen_socket_t));
2396 fprintf(stderr, "read_options: malloc failed.\n");
2399 memset(new, 0, sizeof(listen_socket_t));
2401 temp = (listen_socket_t **) realloc (config_listen_address_list,
2402 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2405 fprintf (stderr, "read_options: realloc failed.\n");
2408 config_listen_address_list = temp;
2410 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2411 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2413 temp[config_listen_address_list_len] = new;
2414 config_listen_address_list_len++;
2422 temp = atoi (optarg);
2424 config_flush_interval = temp;
2427 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2437 temp = atoi (optarg);
2439 config_write_interval = temp;
2442 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2452 temp = atoi(optarg);
2454 config_write_jitter = temp;
2457 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2465 config_write_base_only = 1;
2472 if (config_base_dir != NULL)
2473 free (config_base_dir);
2474 config_base_dir = strdup (optarg);
2475 if (config_base_dir == NULL)
2477 fprintf (stderr, "read_options: strdup failed.\n");
2481 len = strlen (config_base_dir);
2482 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2484 config_base_dir[len - 1] = 0;
2490 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2494 _config_base_dir_len = len;
2500 if (config_pid_file != NULL)
2501 free (config_pid_file);
2502 config_pid_file = strdup (optarg);
2503 if (config_pid_file == NULL)
2505 fprintf (stderr, "read_options: strdup failed.\n");
2512 config_flush_at_shutdown = 1;
2517 struct stat statbuf;
2518 const char *dir = optarg;
2520 status = stat(dir, &statbuf);
2523 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2527 if (!S_ISDIR(statbuf.st_mode)
2528 || access(dir, R_OK|W_OK|X_OK) != 0)
2530 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2531 errno ? rrd_strerror(errno) : "");
2535 journal_cur = malloc(PATH_MAX + 1);
2536 journal_old = malloc(PATH_MAX + 1);
2537 if (journal_cur == NULL || journal_old == NULL)
2539 fprintf(stderr, "malloc failure for journal files\n");
2544 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2545 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2552 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2554 "Usage: rrdcached [options]\n"
2556 "Valid options are:\n"
2557 " -l <address> Socket address to listen to.\n"
2558 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2559 " -w <seconds> Interval in which to write data.\n"
2560 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2561 " -f <seconds> Interval in which to flush dead data.\n"
2562 " -p <file> Location of the PID-file.\n"
2563 " -b <dir> Base directory to change to.\n"
2564 " -B Restrict file access to paths within -b <dir>\n"
2565 " -g Do not fork and run in the foreground.\n"
2566 " -j <dir> Directory in which to create the journal files.\n"
2567 " -F Always flush all updates at shutdown\n"
2569 "For more information and a detailed description of all options "
2571 "to the rrdcached(1) manual page.\n",
2575 } /* switch (option) */
2576 } /* while (getopt) */
2578 /* advise the user when values are not sane */
2579 if (config_flush_interval < 2 * config_write_interval)
2580 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2581 " 2x write interval (-w) !\n");
2582 if (config_write_jitter > config_write_interval)
2583 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2584 " write interval (-w) !\n");
2586 if (config_write_base_only && config_base_dir == NULL)
2587 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2588 " Consult the rrdcached documentation\n");
2590 if (journal_cur == NULL)
2591 config_flush_at_shutdown = 1;
2594 } /* }}} int read_options */
2596 int main (int argc, char **argv)
2600 status = read_options (argc, argv);
2608 status = daemonize ();
2611 struct sigaction sigchld;
2613 memset (&sigchld, 0, sizeof (sigchld));
2614 sigchld.sa_handler = SIG_IGN;
2615 sigaction (SIGCHLD, &sigchld, NULL);
2619 else if (status != 0)
2621 fprintf (stderr, "daemonize failed, exiting.\n");
2627 /* start the queue thread */
2628 memset (&queue_thread, 0, sizeof (queue_thread));
2629 status = pthread_create (&queue_thread,
2635 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2640 listen_thread_main (NULL);
2647 * vim: set sw=2 sts=2 ts=8 et fdm=marker :