2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
77 #include <sys/types.h>
81 #include <sys/socket.h>
92 #include <glib-2.0/glib.h>
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
98 # define __attribute__(x) /**/
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
115 char addr[PATH_MAX + 1];
117 socket_privilege privilege;
119 /* state for BATCH processing */
131 typedef struct listen_socket_s listen_socket_t;
134 typedef struct cache_item_s cache_item_t;
140 time_t last_flush_time;
141 time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
145 pthread_cond_t flushed;
150 struct callback_flush_data_s
157 typedef struct callback_flush_data_s callback_flush_data_t;
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
168 #define RBUF_SIZE (CMD_MAX*2)
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
188 static GTree *cache_tree = NULL;
189 static cache_item_t *cache_queue_head = NULL;
190 static cache_item_t *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
227 static void sig_common (const char *sig) /* {{{ */
229 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
231 pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246 config_flush_at_shutdown = 1;
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252 config_flush_at_shutdown = 0;
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258 /* These structures are static, because `sigaction' behaves weird if the are
260 static struct sigaction sa_int;
261 static struct sigaction sa_term;
262 static struct sigaction sa_pipe;
263 static struct sigaction sa_usr1;
264 static struct sigaction sa_usr2;
266 /* Install signal handlers */
267 memset (&sa_int, 0, sizeof (sa_int));
268 sa_int.sa_handler = sig_int_handler;
269 sigaction (SIGINT, &sa_int, NULL);
271 memset (&sa_term, 0, sizeof (sa_term));
272 sa_term.sa_handler = sig_term_handler;
273 sigaction (SIGTERM, &sa_term, NULL);
275 memset (&sa_pipe, 0, sizeof (sa_pipe));
276 sa_pipe.sa_handler = SIG_IGN;
277 sigaction (SIGPIPE, &sa_pipe, NULL);
279 memset (&sa_pipe, 0, sizeof (sa_usr1));
280 sa_usr1.sa_handler = sig_usr1_handler;
281 sigaction (SIGUSR1, &sa_usr1, NULL);
283 memset (&sa_usr2, 0, sizeof (sa_usr2));
284 sa_usr2.sa_handler = sig_usr2_handler;
285 sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(char *action, int oflag) /* {{{ */
294 file = (config_pid_file != NULL)
296 : LOCALSTATEDIR "/run/rrdcached.pid";
298 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
300 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301 action, file, rrd_strerror(errno));
304 } /* }}} static int open_pidfile */
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
313 pid_fd = open_pidfile("open", O_RDWR);
317 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
324 /* another running process that we can signal COULD be
325 * a competing rrdcached */
326 if (pid != getpid() && kill(pid, 0) == 0)
329 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
334 lseek(pid_fd, 0, SEEK_SET);
335 ftruncate(pid_fd, 0);
338 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339 "rrdcached: starting normally.\n", pid);
342 } /* }}} static int check_pidfile */
344 static int write_pidfile (int fd) /* {{{ */
351 fh = fdopen (fd, "w");
354 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
359 fprintf (fh, "%i\n", (int) pid);
363 } /* }}} int write_pidfile */
365 static int remove_pidfile (void) /* {{{ */
370 file = (config_pid_file != NULL)
372 : LOCALSTATEDIR "/run/rrdcached.pid";
374 status = unlink (file);
378 } /* }}} int remove_pidfile */
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
384 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385 sock->next_read - sock->next_cmd);
389 /* no commands left, move remainder back to front of rbuf */
390 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391 sock->next_read - sock->next_cmd);
392 sock->next_read -= sock->next_cmd;
399 char *cmd = sock->rbuf + sock->next_cmd;
402 sock->next_cmd = eol - sock->rbuf + 1;
404 if (eol > sock->rbuf && *(eol-1) == '\r')
405 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
421 assert(sock != NULL);
423 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
426 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
430 strncpy(new_buf + sock->wbuf_len, str, len + 1);
432 sock->wbuf = new_buf;
433 sock->wbuf_len += len;
436 } /* }}} static int add_to_wbuf */
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
442 char buffer[CMD_MAX];
445 if (sock == NULL) return 0; /* journal replay mode */
446 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
449 #ifdef HAVE_VSNPRINTF
450 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
452 len = vsprintf(buffer, fmt, argp);
457 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
461 return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
464 static int count_lines(char *str) /* {{{ */
470 while ((str = strchr(str, '\n')) != NULL)
478 } /* }}} static int count_lines */
480 /* send the response back to the user.
481 * returns 0 on success, -1 on error
482 * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484 char *fmt, ...) /* {{{ */
487 char buffer[CMD_MAX];
492 if (sock == NULL) return rc; /* journal replay mode */
494 if (sock->batch_start)
497 return rc; /* no response on success during BATCH */
498 lines = sock->batch_cmd;
500 else if (rc == RESP_OK)
501 lines = count_lines(sock->wbuf);
505 rclen = sprintf(buffer, "%d ", lines);
507 #ifdef HAVE_VSNPRINTF
508 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
510 len = vsprintf(buffer+rclen, fmt, argp);
518 /* append the result to the wbuf, don't write to the user */
519 if (sock->batch_start)
520 return add_to_wbuf(sock, buffer, len);
522 /* first write must be complete */
523 if (len != write(sock->fd, buffer, len))
525 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
529 if (sock->wbuf != NULL && rc == RESP_OK)
532 while (wrote < sock->wbuf_len)
534 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
537 RRDD_LOG(LOG_INFO, "send_response: could not write results");
544 free(sock->wbuf); sock->wbuf = NULL;
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
555 ci->last_flush_time = when;
556 if (config_write_jitter > 0)
557 ci->last_flush_time += (random() % config_write_jitter);
561 * remove a "cache_item_t" item from the queue.
562 * must hold 'cache_lock' when calling this
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
566 if (ci == NULL) return;
567 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
569 if (ci->prev == NULL)
570 cache_queue_head = ci->next; /* reset head */
572 ci->prev->next = ci->next;
574 if (ci->next == NULL)
575 cache_queue_tail = ci->prev; /* reset the tail */
577 ci->next->prev = ci->prev;
579 ci->next = ci->prev = NULL;
580 ci->flags &= ~CI_FLAGS_IN_QUEUE;
581 } /* }}} static void remove_from_queue */
583 /* remove an entry from the tree and free all its resources.
584 * must hold 'cache lock' while calling this.
585 * returns 0 on success, otherwise errno */
586 static int forget_file(const char *file)
590 ci = g_tree_lookup(cache_tree, file);
594 g_tree_remove (cache_tree, file);
595 remove_from_queue(ci);
597 for (int i=0; i < ci->values_num; i++)
603 /* in case anyone is waiting */
604 pthread_cond_broadcast(&ci->flushed);
609 } /* }}} static int forget_file */
612 * enqueue_cache_item:
613 * `cache_lock' must be acquired before calling this function!
615 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
621 if (ci->values_num == 0)
626 if (cache_queue_head == ci)
629 /* remove if further down in queue */
630 remove_from_queue(ci);
633 ci->next = cache_queue_head;
634 if (ci->next != NULL)
636 cache_queue_head = ci;
638 if (cache_queue_tail == NULL)
639 cache_queue_tail = cache_queue_head;
641 else /* (side == TAIL) */
643 /* We don't move values back in the list.. */
644 if (ci->flags & CI_FLAGS_IN_QUEUE)
647 assert (ci->next == NULL);
648 assert (ci->prev == NULL);
650 ci->prev = cache_queue_tail;
652 if (cache_queue_tail == NULL)
653 cache_queue_head = ci;
655 cache_queue_tail->next = ci;
657 cache_queue_tail = ci;
660 ci->flags |= CI_FLAGS_IN_QUEUE;
662 pthread_cond_broadcast(&cache_cond);
663 pthread_mutex_lock (&stats_lock);
664 stats_queue_length++;
665 pthread_mutex_unlock (&stats_lock);
668 } /* }}} int enqueue_cache_item */
671 * tree_callback_flush:
672 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673 * while this is in progress.
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
679 callback_flush_data_t *cfd;
681 ci = (cache_item_t *) value;
682 cfd = (callback_flush_data_t *) data;
684 if (ci->flags & CI_FLAGS_IN_QUEUE)
687 if ((ci->last_flush_time <= cfd->abs_timeout)
688 && (ci->values_num > 0))
690 enqueue_cache_item (ci, TAIL);
692 else if ((do_shutdown != 0)
693 && (ci->values_num > 0))
695 enqueue_cache_item (ci, TAIL);
697 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
698 && (ci->values_num <= 0))
702 temp = (char **) realloc (cfd->keys,
703 sizeof (char *) * (cfd->keys_num + 1));
706 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
710 /* Make really sure this points to the _same_ place */
711 assert ((char *) key == ci->file);
712 cfd->keys[cfd->keys_num] = (char *) key;
717 } /* }}} gboolean tree_callback_flush */
719 static int flush_old_values (int max_age)
721 callback_flush_data_t cfd;
724 memset (&cfd, 0, sizeof (cfd));
725 /* Pass the current time as user data so that we don't need to call
726 * `time' for each node. */
727 cfd.now = time (NULL);
732 cfd.abs_timeout = cfd.now - max_age;
734 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
736 /* `tree_callback_flush' will return the keys of all values that haven't
737 * been touched in the last `config_flush_interval' seconds in `cfd'.
738 * The char*'s in this array point to the same memory as ci->file, so we
739 * don't need to free them separately. */
740 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
742 for (k = 0; k < cfd.keys_num; k++)
744 /* should never fail, since we have held the cache_lock
746 assert( forget_file(cfd.keys[k]) == 0 );
749 if (cfd.keys != NULL)
756 } /* int flush_old_values */
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
761 struct timespec next_flush;
762 int final_flush = 0; /* make sure we only flush once on shutdown */
764 gettimeofday (&now, NULL);
765 next_flush.tv_sec = now.tv_sec + config_flush_interval;
766 next_flush.tv_nsec = 1000 * now.tv_usec;
768 pthread_mutex_lock (&cache_lock);
769 while ((do_shutdown == 0) || (cache_queue_head != NULL))
778 /* First, check if it's time to do the cache flush. */
779 gettimeofday (&now, NULL);
780 if ((now.tv_sec > next_flush.tv_sec)
781 || ((now.tv_sec == next_flush.tv_sec)
782 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
784 /* Flush all values that haven't been written in the last
785 * `config_write_interval' seconds. */
786 flush_old_values (config_write_interval);
788 /* Determine the time of the next cache flush. */
790 now.tv_sec + next_flush.tv_sec % config_flush_interval;
792 /* unlock the cache while we rotate so we don't block incoming
793 * updates if the fsync() blocks on disk I/O */
794 pthread_mutex_unlock(&cache_lock);
796 pthread_mutex_lock(&cache_lock);
799 /* Now, check if there's something to store away. If not, wait until
800 * something comes in or it's time to do the cache flush. if we are
801 * shutting down, do not wait around. */
802 if (cache_queue_head == NULL && !do_shutdown)
804 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805 if ((status != 0) && (status != ETIMEDOUT))
807 RRDD_LOG (LOG_ERR, "queue_thread_main: "
808 "pthread_cond_timedwait returned %i.", status);
812 /* We're about to shut down */
813 if (do_shutdown != 0 && !final_flush++)
815 if (config_flush_at_shutdown)
816 flush_old_values (-1); /* flush everything */
821 /* Check if a value has arrived. This may be NULL if we timed out or there
822 * was an interrupt such as a signal. */
823 if (cache_queue_head == NULL)
826 ci = cache_queue_head;
828 /* copy the relevant parts */
829 file = strdup (ci->file);
832 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
836 assert(ci->values != NULL);
837 assert(ci->values_num > 0);
840 values_num = ci->values_num;
842 wipe_ci_values(ci, time(NULL));
843 remove_from_queue(ci);
845 pthread_mutex_lock (&stats_lock);
846 assert (stats_queue_length > 0);
847 stats_queue_length--;
848 pthread_mutex_unlock (&stats_lock);
850 pthread_mutex_unlock (&cache_lock);
853 status = rrd_update_r (file, NULL, values_num, (void *) values);
856 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857 "rrd_update_r (%s) failed with status %i. (%s)",
858 file, status, rrd_get_error());
861 journal_write("wrote", file);
862 pthread_cond_broadcast(&ci->flushed);
864 for (i = 0; i < values_num; i++)
872 pthread_mutex_lock (&stats_lock);
873 stats_updates_written++;
874 stats_data_sets_written += values_num;
875 pthread_mutex_unlock (&stats_lock);
878 pthread_mutex_lock (&cache_lock);
880 /* We're about to shut down */
881 if (do_shutdown != 0 && !final_flush++)
883 if (config_flush_at_shutdown)
884 flush_old_values (-1); /* flush everything */
888 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889 pthread_mutex_unlock (&cache_lock);
891 if (config_flush_at_shutdown)
893 assert(cache_queue_head == NULL);
894 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
900 } /* }}} void *queue_thread_main */
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903 size_t *buffer_size_ret, char **field_ret)
912 buffer = *buffer_ret;
914 buffer_size = *buffer_size_ret;
918 if (buffer_size <= 0)
921 /* This is ensured by `handle_request'. */
922 assert (buffer[buffer_size - 1] == '\0');
925 while (buffer_pos < buffer_size)
927 /* Check for end-of-field or end-of-buffer */
928 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
930 field[field_size] = 0;
936 /* Handle escaped characters. */
937 else if (buffer[buffer_pos] == '\\')
939 if (buffer_pos >= (buffer_size - 1))
942 field[field_size] = buffer[buffer_pos];
946 /* Normal operation */
949 field[field_size] = buffer[buffer_pos];
953 } /* while (buffer_pos < buffer_size) */
958 *buffer_ret = buffer + buffer_pos;
959 *buffer_size_ret = buffer_size - buffer_pos;
963 } /* }}} int buffer_get_field */
965 /* if we're restricting writes to the base directory,
966 * check whether the file falls within the dir
967 * returns 1 if OK, otherwise 0
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
971 assert(file != NULL);
973 if (!config_write_base_only
974 || sock == NULL /* journal replay */
975 || config_base_dir == NULL)
978 if (strstr(file, "../") != NULL) goto err;
980 /* relative paths without "../" are ok */
981 if (*file != '/') return 1;
983 /* file must be of the format base + "/" + <1+ char filename> */
984 if (strlen(file) < _config_base_dir_len + 2) goto err;
985 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986 if (*(file + _config_base_dir_len) != '/') goto err;
991 if (sock != NULL && sock->fd >= 0)
992 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
995 } /* }}} static int check_file_access */
997 /* when using a base dir, convert relative paths to absolute paths.
998 * if necessary, modifies the "filename" pointer to point
999 * to the new path created in "tmp". "tmp" is provided
1000 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1002 * this allows us to optimize for the expected case (absolute path)
1005 static void get_abs_path(char **filename, char *tmp)
1007 assert(tmp != NULL);
1008 assert(filename != NULL && *filename != NULL);
1010 if (config_base_dir == NULL || **filename == '/')
1013 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1015 } /* }}} static int get_abs_path */
1017 /* returns 1 if we have the required privilege level,
1018 * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020 socket_privilege priv)
1022 if (sock == NULL) /* journal replay */
1025 if (sock->privilege >= priv)
1028 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1031 static int flush_file (const char *filename) /* {{{ */
1035 pthread_mutex_lock (&cache_lock);
1037 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1040 pthread_mutex_unlock (&cache_lock);
1044 if (ci->values_num > 0)
1046 /* Enqueue at head */
1047 enqueue_cache_item (ci, HEAD);
1048 pthread_cond_wait(&ci->flushed, &cache_lock);
1051 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1052 * may have been purged during our cond_wait() */
1054 pthread_mutex_unlock(&cache_lock);
1057 } /* }}} int flush_file */
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060 char *buffer, size_t buffer_size)
1066 char *help_help[2] =
1068 "Command overview\n"
1070 "HELP [<command>]\n"
1071 "FLUSH <filename>\n"
1073 "PENDING <filename>\n"
1074 "FORGET <filename>\n"
1075 "UPDATE <filename> <values> [<values> ...]\n"
1080 char *help_flush[2] =
1084 "Usage: FLUSH <filename>\n"
1086 "Adds the given filename to the head of the update queue and returns\n"
1087 "after is has been dequeued.\n"
1090 char *help_flushall[2] =
1092 "Help for FLUSHALL\n"
1096 "Triggers writing of all pending updates. Returns immediately.\n"
1099 char *help_pending[2] =
1101 "Help for PENDING\n"
1103 "Usage: PENDING <filename>\n"
1105 "Shows any 'pending' updates for a file, in order.\n"
1106 "The updates shown have not yet been written to the underlying RRD file.\n"
1109 char *help_forget[2] =
1113 "Usage: FORGET <filename>\n"
1115 "Removes the file completely from the cache.\n"
1116 "Any pending updates for the file will be lost.\n"
1119 char *help_update[2] =
1123 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1125 "Adds the given file to the internal cache if it is not yet known and\n"
1126 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1129 "Each <values> has the following form:\n"
1130 " <values> = <time>:<value>[:<value>[...]]\n"
1131 "See the rrdupdate(1) manpage for details.\n"
1134 char *help_stats[2] =
1140 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141 "a description of the values.\n"
1144 char *help_batch[2] =
1148 "The 'BATCH' command permits the client to initiate a bulk load\n"
1149 " of commands to rrdcached.\n"
1154 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1155 " client: command #1\n"
1156 " client: command #2\n"
1157 " client: ... and so on\n"
1159 " server: 2 errors\n"
1160 " server: 7 message for command #7\n"
1161 " server: 9 message for command #9\n"
1163 "For more information, consult the rrdcached(1) documentation.\n"
1166 status = buffer_get_field (&buffer, &buffer_size, &command);
1168 help_text = help_help;
1171 if (strcasecmp (command, "update") == 0)
1172 help_text = help_update;
1173 else if (strcasecmp (command, "flush") == 0)
1174 help_text = help_flush;
1175 else if (strcasecmp (command, "flushall") == 0)
1176 help_text = help_flushall;
1177 else if (strcasecmp (command, "pending") == 0)
1178 help_text = help_pending;
1179 else if (strcasecmp (command, "forget") == 0)
1180 help_text = help_forget;
1181 else if (strcasecmp (command, "stats") == 0)
1182 help_text = help_stats;
1183 else if (strcasecmp (command, "batch") == 0)
1184 help_text = help_batch;
1186 help_text = help_help;
1189 add_response_info(sock, help_text[1]);
1190 return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1195 uint64_t copy_queue_length;
1196 uint64_t copy_updates_received;
1197 uint64_t copy_flush_received;
1198 uint64_t copy_updates_written;
1199 uint64_t copy_data_sets_written;
1200 uint64_t copy_journal_bytes;
1201 uint64_t copy_journal_rotate;
1203 uint64_t tree_nodes_number;
1204 uint64_t tree_depth;
1206 pthread_mutex_lock (&stats_lock);
1207 copy_queue_length = stats_queue_length;
1208 copy_updates_received = stats_updates_received;
1209 copy_flush_received = stats_flush_received;
1210 copy_updates_written = stats_updates_written;
1211 copy_data_sets_written = stats_data_sets_written;
1212 copy_journal_bytes = stats_journal_bytes;
1213 copy_journal_rotate = stats_journal_rotate;
1214 pthread_mutex_unlock (&stats_lock);
1216 pthread_mutex_lock (&cache_lock);
1217 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218 tree_depth = (uint64_t) g_tree_height (cache_tree);
1219 pthread_mutex_unlock (&cache_lock);
1221 add_response_info(sock,
1222 "QueueLength: %"PRIu64"\n", copy_queue_length);
1223 add_response_info(sock,
1224 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225 add_response_info(sock,
1226 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227 add_response_info(sock,
1228 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229 add_response_info(sock,
1230 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1236 send_response(sock, RESP_OK, "Statistics follow\n");
1239 } /* }}} int handle_request_stats */
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242 char *buffer, size_t buffer_size)
1244 char *file, file_tmp[PATH_MAX];
1247 status = buffer_get_field (&buffer, &buffer_size, &file);
1250 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1254 pthread_mutex_lock(&stats_lock);
1255 stats_flush_received++;
1256 pthread_mutex_unlock(&stats_lock);
1258 get_abs_path(&file, file_tmp);
1259 if (!check_file_access(file, sock)) return 0;
1261 status = flush_file (file);
1263 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264 else if (status == ENOENT)
1266 /* no file in our tree; see whether it exists at all */
1267 struct stat statbuf;
1269 memset(&statbuf, 0, sizeof(statbuf));
1270 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1273 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1275 else if (status < 0)
1276 return send_response(sock, RESP_ERR, "Internal error.\n");
1278 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1283 } /* }}} int handle_request_flush */
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1289 status = has_privilege(sock, PRIV_HIGH);
1293 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1295 pthread_mutex_lock(&cache_lock);
1296 flush_old_values(-1);
1297 pthread_mutex_unlock(&cache_lock);
1299 return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303 char *buffer, size_t buffer_size)
1306 char *file, file_tmp[PATH_MAX];
1309 status = buffer_get_field(&buffer, &buffer_size, &file);
1311 return send_response(sock, RESP_ERR,
1312 "Usage: PENDING <filename>\n");
1314 status = has_privilege(sock, PRIV_HIGH);
1318 get_abs_path(&file, file_tmp);
1320 pthread_mutex_lock(&cache_lock);
1321 ci = g_tree_lookup(cache_tree, file);
1324 pthread_mutex_unlock(&cache_lock);
1325 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1328 for (int i=0; i < ci->values_num; i++)
1329 add_response_info(sock, "%s\n", ci->values[i]);
1331 pthread_mutex_unlock(&cache_lock);
1332 return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336 char *buffer, size_t buffer_size)
1339 char *file, file_tmp[PATH_MAX];
1341 status = buffer_get_field(&buffer, &buffer_size, &file);
1343 return send_response(sock, RESP_ERR,
1344 "Usage: FORGET <filename>\n");
1346 status = has_privilege(sock, PRIV_HIGH);
1350 get_abs_path(&file, file_tmp);
1351 if (!check_file_access(file, sock)) return 0;
1353 pthread_mutex_lock(&cache_lock);
1354 status = forget_file(file);
1355 pthread_mutex_unlock(&cache_lock);
1360 journal_write("forget", file);
1362 return send_response(sock, RESP_OK, "Gone!\n");
1365 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366 status < 0 ? "Internal error" : rrd_strerror(status));
1370 } /* }}} static int handle_request_forget */
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1374 char *buffer, size_t buffer_size)
1376 char *file, file_tmp[PATH_MAX];
1378 int bad_timestamps = 0;
1380 char orig_buf[CMD_MAX];
1384 status = has_privilege(sock, PRIV_HIGH);
1388 /* save it for the journal later */
1389 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1391 status = buffer_get_field (&buffer, &buffer_size, &file);
1393 return send_response(sock, RESP_ERR,
1394 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1396 pthread_mutex_lock(&stats_lock);
1397 stats_updates_received++;
1398 pthread_mutex_unlock(&stats_lock);
1400 get_abs_path(&file, file_tmp);
1401 if (!check_file_access(file, sock)) return 0;
1403 pthread_mutex_lock (&cache_lock);
1404 ci = g_tree_lookup (cache_tree, file);
1406 if (ci == NULL) /* {{{ */
1408 struct stat statbuf;
1410 /* don't hold the lock while we setup; stat(2) might block */
1411 pthread_mutex_unlock(&cache_lock);
1413 memset (&statbuf, 0, sizeof (statbuf));
1414 status = stat (file, &statbuf);
1417 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1420 if (status == ENOENT)
1421 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1423 return send_response(sock, RESP_ERR,
1424 "stat failed with error %i.\n", status);
1426 if (!S_ISREG (statbuf.st_mode))
1427 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1429 if (access(file, R_OK|W_OK) != 0)
1430 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431 file, rrd_strerror(errno));
1433 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1436 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1438 return send_response(sock, RESP_ERR, "malloc failed.\n");
1440 memset (ci, 0, sizeof (cache_item_t));
1442 ci->file = strdup (file);
1443 if (ci->file == NULL)
1446 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1448 return send_response(sock, RESP_ERR, "strdup failed.\n");
1451 wipe_ci_values(ci, now);
1452 ci->flags = CI_FLAGS_IN_TREE;
1454 pthread_mutex_lock(&cache_lock);
1455 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1457 assert (ci != NULL);
1459 /* don't re-write updates in replay mode */
1461 journal_write("update", orig_buf);
1463 while (buffer_size > 0)
1470 status = buffer_get_field (&buffer, &buffer_size, &value);
1473 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1477 /* make sure update time is always moving forward */
1478 stamp = strtol(value, &eostamp, 10);
1479 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1482 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1485 else if (stamp <= ci->last_update_stamp)
1488 add_response_info(sock,
1489 "illegal attempt to update using time %ld when"
1490 " last update time is %ld (minimum one second step)\n",
1491 stamp, ci->last_update_stamp);
1495 ci->last_update_stamp = stamp;
1497 temp = (char **) realloc (ci->values,
1498 sizeof (char *) * (ci->values_num + 1));
1501 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1506 ci->values[ci->values_num] = strdup (value);
1507 if (ci->values[ci->values_num] == NULL)
1509 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1517 if (((now - ci->last_flush_time) >= config_write_interval)
1518 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1519 && (ci->values_num > 0))
1521 enqueue_cache_item (ci, TAIL);
1524 pthread_mutex_unlock (&cache_lock);
1528 /* journal replay mode */
1529 if (sock == NULL) return RESP_ERR;
1531 /* if we had only one update attempt, then return the full
1532 error message... try to get the most information out
1533 of the limited error space allowed by the protocol
1535 if (bad_timestamps == 1)
1536 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1538 return send_response(sock, RESP_ERR,
1539 "No values updated (%d bad timestamps).\n",
1543 return send_response(sock, RESP_OK,
1544 "errors, enqueued %i value(s).\n", values_num);
1549 } /* }}} int handle_request_update */
1551 /* we came across a "WROTE" entry during journal replay.
1552 * throw away any values that we have accumulated for this file
1554 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1558 const char *file = buffer;
1560 pthread_mutex_lock(&cache_lock);
1562 ci = g_tree_lookup(cache_tree, file);
1565 pthread_mutex_unlock(&cache_lock);
1571 for (i=0; i < ci->values_num; i++)
1572 free(ci->values[i]);
1577 wipe_ci_values(ci, now);
1578 remove_from_queue(ci);
1580 pthread_mutex_unlock(&cache_lock);
1582 } /* }}} int handle_request_wrote */
1584 /* start "BATCH" processing */
1585 static int batch_start (listen_socket_t *sock) /* {{{ */
1588 if (sock->batch_start)
1589 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1591 status = send_response(sock, RESP_OK,
1592 "Go ahead. End with dot '.' on its own line.\n");
1593 sock->batch_start = time(NULL);
1594 sock->batch_cmd = 0;
1597 } /* }}} static int batch_start */
1599 /* finish "BATCH" processing and return results to the client */
1600 static int batch_done (listen_socket_t *sock) /* {{{ */
1602 assert(sock->batch_start);
1603 sock->batch_start = 0;
1604 sock->batch_cmd = 0;
1605 return send_response(sock, RESP_OK, "errors\n");
1606 } /* }}} static int batch_done */
1608 /* if sock==NULL, we are in journal replay mode */
1609 static int handle_request (listen_socket_t *sock, /* {{{ */
1611 char *buffer, size_t buffer_size)
1617 assert (buffer[buffer_size - 1] == '\0');
1619 buffer_ptr = buffer;
1621 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1624 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1628 if (sock != NULL && sock->batch_start)
1631 if (strcasecmp (command, "update") == 0)
1632 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1633 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1635 /* this is only valid in replay mode */
1636 return (handle_request_wrote (buffer_ptr, now));
1638 else if (strcasecmp (command, "flush") == 0)
1639 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1640 else if (strcasecmp (command, "flushall") == 0)
1641 return (handle_request_flushall(sock));
1642 else if (strcasecmp (command, "pending") == 0)
1643 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1644 else if (strcasecmp (command, "forget") == 0)
1645 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1646 else if (strcasecmp (command, "stats") == 0)
1647 return (handle_request_stats (sock));
1648 else if (strcasecmp (command, "help") == 0)
1649 return (handle_request_help (sock, buffer_ptr, buffer_size));
1650 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1651 return batch_start(sock);
1652 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1653 return batch_done(sock);
1655 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1659 } /* }}} int handle_request */
1661 /* MUST NOT hold journal_lock before calling this */
1662 static void journal_rotate(void) /* {{{ */
1664 FILE *old_fh = NULL;
1667 if (journal_cur == NULL || journal_old == NULL)
1670 pthread_mutex_lock(&journal_lock);
1672 /* we rotate this way (rename before close) so that the we can release
1673 * the journal lock as fast as possible. Journal writes to the new
1674 * journal can proceed immediately after the new file is opened. The
1675 * fclose can then block without affecting new updates.
1677 if (journal_fh != NULL)
1679 old_fh = journal_fh;
1681 rename(journal_cur, journal_old);
1682 ++stats_journal_rotate;
1685 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1686 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1689 journal_fh = fdopen(new_fd, "a");
1690 if (journal_fh == NULL)
1694 pthread_mutex_unlock(&journal_lock);
1699 if (journal_fh == NULL)
1702 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1703 journal_cur, rrd_strerror(errno));
1706 "JOURNALING DISABLED: All values will be flushed at shutdown");
1707 config_flush_at_shutdown = 1;
1710 } /* }}} static void journal_rotate */
1712 static void journal_done(void) /* {{{ */
1714 if (journal_cur == NULL)
1717 pthread_mutex_lock(&journal_lock);
1718 if (journal_fh != NULL)
1724 if (config_flush_at_shutdown)
1726 RRDD_LOG(LOG_INFO, "removing journals");
1727 unlink(journal_old);
1728 unlink(journal_cur);
1732 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1733 "journals will be used at next startup");
1736 pthread_mutex_unlock(&journal_lock);
1738 } /* }}} static void journal_done */
1740 static int journal_write(char *cmd, char *args) /* {{{ */
1744 if (journal_fh == NULL)
1747 pthread_mutex_lock(&journal_lock);
1748 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1749 pthread_mutex_unlock(&journal_lock);
1753 pthread_mutex_lock(&stats_lock);
1754 stats_journal_bytes += chars;
1755 pthread_mutex_unlock(&stats_lock);
1759 } /* }}} static int journal_write */
1761 static int journal_replay (const char *file) /* {{{ */
1767 char entry[CMD_MAX];
1770 if (file == NULL) return 0;
1775 struct stat statbuf;
1777 memset(&statbuf, 0, sizeof(statbuf));
1778 if (stat(file, &statbuf) != 0)
1780 if (errno == ENOENT)
1783 reason = "stat error";
1786 else if (!S_ISREG(statbuf.st_mode))
1788 reason = "not a regular file";
1791 if (statbuf.st_uid != daemon_uid)
1793 reason = "not owned by daemon user";
1796 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798 reason = "must not be user/group writable";
1804 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1805 file, rrd_strerror(status), reason);
1810 fh = fopen(file, "r");
1813 if (errno != ENOENT)
1814 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1815 file, rrd_strerror(errno));
1819 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1828 if (fgets(entry, sizeof(entry), fh) == NULL)
1830 entry_len = strlen(entry);
1832 /* check \n termination in case journal writing crashed mid-line */
1835 else if (entry[entry_len - 1] != '\n')
1837 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1842 entry[entry_len - 1] = '\0';
1844 if (handle_request(NULL, now, entry, entry_len) == 0)
1852 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1853 entry_cnt, fail_cnt);
1855 return entry_cnt > 0 ? 1 : 0;
1856 } /* }}} static int journal_replay */
1858 static void journal_init(void) /* {{{ */
1860 int had_journal = 0;
1862 if (journal_cur == NULL) return;
1864 pthread_mutex_lock(&journal_lock);
1866 RRDD_LOG(LOG_INFO, "checking for journal files");
1868 had_journal += journal_replay(journal_old);
1869 had_journal += journal_replay(journal_cur);
1871 /* it must have been a crash. start a flush */
1872 if (had_journal && config_flush_at_shutdown)
1873 flush_old_values(-1);
1875 pthread_mutex_unlock(&journal_lock);
1878 RRDD_LOG(LOG_INFO, "journal processing complete");
1880 } /* }}} static void journal_init */
1882 static void close_connection(listen_socket_t *sock)
1884 close(sock->fd) ; sock->fd = -1;
1885 free(sock->rbuf); sock->rbuf = NULL;
1886 free(sock->wbuf); sock->wbuf = NULL;
1891 static void *connection_thread_main (void *args) /* {{{ */
1894 listen_socket_t *sock;
1898 sock = (listen_socket_t *) args;
1901 /* init read buffers */
1902 sock->next_read = sock->next_cmd = 0;
1903 sock->rbuf = malloc(RBUF_SIZE);
1904 if (sock->rbuf == NULL)
1906 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1907 close_connection(sock);
1911 pthread_mutex_lock (&connection_threads_lock);
1915 temp = (pthread_t *) realloc (connection_threads,
1916 sizeof (pthread_t) * (connection_threads_num + 1));
1919 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1923 connection_threads = temp;
1924 connection_threads[connection_threads_num] = pthread_self ();
1925 connection_threads_num++;
1928 pthread_mutex_unlock (&connection_threads_lock);
1930 while (do_shutdown == 0)
1937 struct pollfd pollfd;
1941 pollfd.events = POLLIN | POLLPRI;
1944 status = poll (&pollfd, 1, /* timeout = */ 500);
1947 else if (status == 0) /* timeout */
1949 else if (status < 0) /* error */
1952 if (status != EINTR)
1953 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1957 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1959 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1961 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1962 "poll(2) returned something unexpected: %#04hx",
1967 rbytes = read(fd, sock->rbuf + sock->next_read,
1968 RBUF_SIZE - sock->next_read);
1971 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1974 else if (rbytes == 0)
1977 sock->next_read += rbytes;
1979 if (sock->batch_start)
1980 now = sock->batch_start;
1984 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1986 status = handle_request (sock, now, cmd, cmd_len+1);
1993 close_connection(sock);
1995 self = pthread_self ();
1996 /* Remove this thread from the connection threads list */
1997 pthread_mutex_lock (&connection_threads_lock);
1998 /* Find out own index in the array */
1999 for (i = 0; i < connection_threads_num; i++)
2000 if (pthread_equal (connection_threads[i], self) != 0)
2002 assert (i < connection_threads_num);
2004 /* Move the trailing threads forward. */
2005 if (i < (connection_threads_num - 1))
2007 memmove (connection_threads + i,
2008 connection_threads + i + 1,
2009 sizeof (pthread_t) * (connection_threads_num - i - 1));
2012 connection_threads_num--;
2013 pthread_mutex_unlock (&connection_threads_lock);
2016 } /* }}} void *connection_thread_main */
2018 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2021 struct sockaddr_un sa;
2022 listen_socket_t *temp;
2027 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2028 path += strlen("unix:");
2030 temp = (listen_socket_t *) realloc (listen_fds,
2031 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2034 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2038 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2040 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2043 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2044 rrd_strerror(errno));
2048 memset (&sa, 0, sizeof (sa));
2049 sa.sun_family = AF_UNIX;
2050 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2052 /* if we've gotten this far, we own the pid file. any daemon started
2053 * with the same args must not be alive. therefore, ensure that we can
2054 * create the socket...
2058 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2061 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2062 path, rrd_strerror(errno));
2067 status = listen (fd, /* backlog = */ 10);
2070 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2071 path, rrd_strerror(errno));
2077 listen_fds[listen_fds_num].fd = fd;
2078 listen_fds[listen_fds_num].family = PF_UNIX;
2079 strncpy(listen_fds[listen_fds_num].addr, path,
2080 sizeof (listen_fds[listen_fds_num].addr) - 1);
2084 } /* }}} int open_listen_socket_unix */
2086 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2088 struct addrinfo ai_hints;
2089 struct addrinfo *ai_res;
2090 struct addrinfo *ai_ptr;
2091 char addr_copy[NI_MAXHOST];
2096 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2097 addr_copy[sizeof (addr_copy) - 1] = 0;
2100 memset (&ai_hints, 0, sizeof (ai_hints));
2101 ai_hints.ai_flags = 0;
2102 #ifdef AI_ADDRCONFIG
2103 ai_hints.ai_flags |= AI_ADDRCONFIG;
2105 ai_hints.ai_family = AF_UNSPEC;
2106 ai_hints.ai_socktype = SOCK_STREAM;
2109 if (*addr == '[') /* IPv6+port format */
2111 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2114 port = strchr (addr, ']');
2117 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2125 else if (*port == 0)
2129 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2132 } /* if (*addr = ']') */
2133 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2135 port = rindex(addr, ':');
2143 status = getaddrinfo (addr,
2144 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2145 &ai_hints, &ai_res);
2148 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2149 addr, gai_strerror (status));
2153 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2156 listen_socket_t *temp;
2159 temp = (listen_socket_t *) realloc (listen_fds,
2160 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2164 "rrdcached: open_listen_socket_network: realloc failed.\n");
2168 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2170 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2173 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2174 rrd_strerror(errno));
2178 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2180 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2183 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2184 sock->addr, rrd_strerror(errno));
2189 status = listen (fd, /* backlog = */ 10);
2192 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2193 sock->addr, rrd_strerror(errno));
2198 listen_fds[listen_fds_num].fd = fd;
2199 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2201 } /* for (ai_ptr) */
2204 } /* }}} static int open_listen_socket_network */
2206 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2208 assert(sock != NULL);
2209 assert(sock->addr != NULL);
2211 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2212 || sock->addr[0] == '/')
2213 return (open_listen_socket_unix(sock));
2215 return (open_listen_socket_network(sock));
2216 } /* }}} int open_listen_socket */
2218 static int close_listen_sockets (void) /* {{{ */
2222 for (i = 0; i < listen_fds_num; i++)
2224 close (listen_fds[i].fd);
2226 if (listen_fds[i].family == PF_UNIX)
2227 unlink(listen_fds[i].addr);
2235 } /* }}} int close_listen_sockets */
2237 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2239 struct pollfd *pollfds;
2244 if (listen_fds_num < 1)
2246 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2250 pollfds_num = listen_fds_num;
2251 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2252 if (pollfds == NULL)
2254 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2257 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2259 RRDD_LOG(LOG_INFO, "listening for connections");
2261 while (do_shutdown == 0)
2263 assert (pollfds_num == ((int) listen_fds_num));
2264 for (i = 0; i < pollfds_num; i++)
2266 pollfds[i].fd = listen_fds[i].fd;
2267 pollfds[i].events = POLLIN | POLLPRI;
2268 pollfds[i].revents = 0;
2271 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2274 else if (status == 0) /* timeout */
2276 else if (status < 0) /* error */
2279 if (status != EINTR)
2281 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2286 for (i = 0; i < pollfds_num; i++)
2288 listen_socket_t *client_sock;
2289 struct sockaddr_storage client_sa;
2290 socklen_t client_sa_size;
2292 pthread_attr_t attr;
2294 if (pollfds[i].revents == 0)
2297 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2299 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2300 "poll(2) returned something unexpected for listen FD #%i.",
2305 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2306 if (client_sock == NULL)
2308 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2311 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2313 client_sa_size = sizeof (client_sa);
2314 client_sock->fd = accept (pollfds[i].fd,
2315 (struct sockaddr *) &client_sa, &client_sa_size);
2316 if (client_sock->fd < 0)
2318 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2323 pthread_attr_init (&attr);
2324 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2326 status = pthread_create (&tid, &attr, connection_thread_main,
2330 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2331 close_connection(client_sock);
2334 } /* for (pollfds_num) */
2335 } /* while (do_shutdown == 0) */
2337 RRDD_LOG(LOG_INFO, "starting shutdown");
2339 close_listen_sockets ();
2341 pthread_mutex_lock (&connection_threads_lock);
2342 while (connection_threads_num > 0)
2346 wait_for = connection_threads[0];
2348 pthread_mutex_unlock (&connection_threads_lock);
2349 pthread_join (wait_for, /* retval = */ NULL);
2350 pthread_mutex_lock (&connection_threads_lock);
2352 pthread_mutex_unlock (&connection_threads_lock);
2355 } /* }}} void *listen_thread_main */
2357 static int daemonize (void) /* {{{ */
2362 daemon_uid = geteuid();
2364 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2366 pid_fd = check_pidfile();
2370 /* open all the listen sockets */
2371 if (config_listen_address_list_len > 0)
2373 for (int i = 0; i < config_listen_address_list_len; i++)
2374 open_listen_socket (config_listen_address_list[i]);
2378 listen_socket_t sock;
2379 memset(&sock, 0, sizeof(sock));
2380 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2381 open_listen_socket (&sock);
2384 if (listen_fds_num < 1)
2386 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2390 if (!stay_foreground)
2397 fprintf (stderr, "daemonize: fork(2) failed.\n");
2403 /* Become session leader */
2406 /* Open the first three file descriptors to /dev/null */
2411 open ("/dev/null", O_RDWR);
2414 } /* if (!stay_foreground) */
2416 /* Change into the /tmp directory. */
2417 base_dir = (config_base_dir != NULL)
2421 if (chdir (base_dir) != 0)
2423 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2427 install_signal_handlers();
2429 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2430 RRDD_LOG(LOG_INFO, "starting up");
2432 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2433 if (cache_tree == NULL)
2435 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2439 return write_pidfile (pid_fd);
2444 } /* }}} int daemonize */
2446 static int cleanup (void) /* {{{ */
2450 pthread_cond_signal (&cache_cond);
2451 pthread_join (queue_thread, /* return = */ NULL);
2455 RRDD_LOG(LOG_INFO, "goodbye");
2459 } /* }}} int cleanup */
2461 static int read_options (int argc, char **argv) /* {{{ */
2466 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2477 listen_socket_t **temp;
2478 listen_socket_t *new;
2480 new = malloc(sizeof(listen_socket_t));
2483 fprintf(stderr, "read_options: malloc failed.\n");
2486 memset(new, 0, sizeof(listen_socket_t));
2488 temp = (listen_socket_t **) realloc (config_listen_address_list,
2489 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2492 fprintf (stderr, "read_options: realloc failed.\n");
2495 config_listen_address_list = temp;
2497 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2498 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2500 temp[config_listen_address_list_len] = new;
2501 config_listen_address_list_len++;
2509 temp = atoi (optarg);
2511 config_flush_interval = temp;
2514 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2524 temp = atoi (optarg);
2526 config_write_interval = temp;
2529 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2539 temp = atoi(optarg);
2541 config_write_jitter = temp;
2544 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2552 config_write_base_only = 1;
2558 char base_realpath[PATH_MAX];
2560 if (config_base_dir != NULL)
2561 free (config_base_dir);
2562 config_base_dir = strdup (optarg);
2563 if (config_base_dir == NULL)
2565 fprintf (stderr, "read_options: strdup failed.\n");
2569 /* make sure that the base directory is not resolved via
2570 * symbolic links. this makes some performance-enhancing
2571 * assumptions possible (we don't have to resolve paths
2572 * that start with a "/")
2574 if (realpath(config_base_dir, base_realpath) == NULL)
2576 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2579 else if (strncmp(config_base_dir,
2580 base_realpath, sizeof(base_realpath)) != 0)
2583 "Base directory (-b) resolved via file system links!\n"
2584 "Please consult rrdcached '-b' documentation!\n"
2585 "Consider specifying the real directory (%s)\n",
2590 len = strlen (config_base_dir);
2591 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2593 config_base_dir[len - 1] = 0;
2599 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2603 _config_base_dir_len = len;
2609 if (config_pid_file != NULL)
2610 free (config_pid_file);
2611 config_pid_file = strdup (optarg);
2612 if (config_pid_file == NULL)
2614 fprintf (stderr, "read_options: strdup failed.\n");
2621 config_flush_at_shutdown = 1;
2626 struct stat statbuf;
2627 const char *dir = optarg;
2629 status = stat(dir, &statbuf);
2632 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2636 if (!S_ISDIR(statbuf.st_mode)
2637 || access(dir, R_OK|W_OK|X_OK) != 0)
2639 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2640 errno ? rrd_strerror(errno) : "");
2644 journal_cur = malloc(PATH_MAX + 1);
2645 journal_old = malloc(PATH_MAX + 1);
2646 if (journal_cur == NULL || journal_old == NULL)
2648 fprintf(stderr, "malloc failure for journal files\n");
2653 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2654 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2661 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2663 "Usage: rrdcached [options]\n"
2665 "Valid options are:\n"
2666 " -l <address> Socket address to listen to.\n"
2667 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2668 " -w <seconds> Interval in which to write data.\n"
2669 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2670 " -f <seconds> Interval in which to flush dead data.\n"
2671 " -p <file> Location of the PID-file.\n"
2672 " -b <dir> Base directory to change to.\n"
2673 " -B Restrict file access to paths within -b <dir>\n"
2674 " -g Do not fork and run in the foreground.\n"
2675 " -j <dir> Directory in which to create the journal files.\n"
2676 " -F Always flush all updates at shutdown\n"
2678 "For more information and a detailed description of all options "
2680 "to the rrdcached(1) manual page.\n",
2684 } /* switch (option) */
2685 } /* while (getopt) */
2687 /* advise the user when values are not sane */
2688 if (config_flush_interval < 2 * config_write_interval)
2689 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2690 " 2x write interval (-w) !\n");
2691 if (config_write_jitter > config_write_interval)
2692 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2693 " write interval (-w) !\n");
2695 if (config_write_base_only && config_base_dir == NULL)
2696 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2697 " Consult the rrdcached documentation\n");
2699 if (journal_cur == NULL)
2700 config_flush_at_shutdown = 1;
2703 } /* }}} int read_options */
2705 int main (int argc, char **argv)
2709 status = read_options (argc, argv);
2717 status = daemonize ();
2720 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2726 /* start the queue thread */
2727 memset (&queue_thread, 0, sizeof (queue_thread));
2728 status = pthread_create (&queue_thread,
2734 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2739 listen_thread_main (NULL);
2746 * vim: set sw=2 sts=2 ts=8 et fdm=marker :