2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
70 #include "../rrd_config.h"
75 #include "rrd_client.h"
87 #include <sys/socket.h>
95 #include <sys/types.h>
107 #include <sys/time.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
119 #define RRDD_LOG(severity, ...) \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
135 char addr[PATH_MAX + 1];
138 /* state for BATCH processing */
150 uint32_t permissions;
153 mode_t socket_permissions;
155 typedef struct listen_socket_s listen_socket_t;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
183 typedef struct cache_item_s cache_item_t;
188 size_t values_num; /* number of valid pointers */
189 size_t values_alloc; /* number of allocated pointers */
190 time_t last_flush_time;
191 time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
195 pthread_cond_t flushed;
200 struct callback_flush_data_s
207 typedef struct callback_flush_data_s callback_flush_data_t;
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
222 /* max length of socket command or response */
224 #define RBUF_SIZE (CMD_MAX*2)
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
235 static listen_socket_t default_socket;
238 RUNNING, /* normal operation */
239 FLUSHING, /* flushing remaining values */
240 SHUTDOWN /* shutting down */
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
255 static GTree *cache_tree = NULL;
256 static cache_item_t *cache_queue_head = NULL;
257 static cache_item_t *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
260 static int config_write_interval = 300;
261 static int config_write_jitter = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int opt_no_overwrite = 0; /* default for the daemon */
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL; /* current journal file handle */
291 static long journal_size = 0; /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
304 static void sig_common (const char *sig) /* {{{ */
306 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
308 pthread_cond_broadcast(&flush_cond);
309 pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
315 } /* }}} void sig_int_handler */
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
320 } /* }}} void sig_term_handler */
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
324 config_flush_at_shutdown = 1;
326 } /* }}} void sig_usr1_handler */
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
330 config_flush_at_shutdown = 0;
332 } /* }}} void sig_usr2_handler */
334 static void install_signal_handlers(void) /* {{{ */
336 /* These structures are static, because `sigaction' behaves weird if the are
338 static struct sigaction sa_int;
339 static struct sigaction sa_term;
340 static struct sigaction sa_pipe;
341 static struct sigaction sa_usr1;
342 static struct sigaction sa_usr2;
344 /* Install signal handlers */
345 memset (&sa_int, 0, sizeof (sa_int));
346 sa_int.sa_handler = sig_int_handler;
347 sigaction (SIGINT, &sa_int, NULL);
349 memset (&sa_term, 0, sizeof (sa_term));
350 sa_term.sa_handler = sig_term_handler;
351 sigaction (SIGTERM, &sa_term, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_pipe));
354 sa_pipe.sa_handler = SIG_IGN;
355 sigaction (SIGPIPE, &sa_pipe, NULL);
357 memset (&sa_pipe, 0, sizeof (sa_usr1));
358 sa_usr1.sa_handler = sig_usr1_handler;
359 sigaction (SIGUSR1, &sa_usr1, NULL);
361 memset (&sa_usr2, 0, sizeof (sa_usr2));
362 sa_usr2.sa_handler = sig_usr2_handler;
363 sigaction (SIGUSR2, &sa_usr2, NULL);
365 } /* }}} void install_signal_handlers */
367 static int open_pidfile(char *action, int oflag) /* {{{ */
371 char *file_copy, *dir;
373 file = (config_pid_file != NULL)
375 : LOCALSTATEDIR "/run/rrdcached.pid";
377 /* dirname may modify its argument */
378 file_copy = strdup(file);
379 if (file_copy == NULL)
381 fprintf(stderr, "rrdcached: strdup(): %s\n",
382 rrd_strerror(errno));
386 dir = dirname(file_copy);
387 if (rrd_mkdir_p(dir, 0777) != 0)
389 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390 dir, rrd_strerror(errno));
396 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
398 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399 action, file, rrd_strerror(errno));
402 } /* }}} static int open_pidfile */
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
411 pid_fd = open_pidfile("open", O_RDWR);
415 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
422 /* another running process that we can signal COULD be
423 * a competing rrdcached */
424 if (pid != getpid() && kill(pid, 0) == 0)
427 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
432 lseek(pid_fd, 0, SEEK_SET);
433 if (ftruncate(pid_fd, 0) == -1)
436 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
442 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443 "rrdcached: starting normally.\n", pid);
446 } /* }}} static int check_pidfile */
448 static int write_pidfile (int fd) /* {{{ */
455 fh = fdopen (fd, "w");
458 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
463 fprintf (fh, "%i\n", (int) pid);
467 } /* }}} int write_pidfile */
469 static int remove_pidfile (void) /* {{{ */
474 file = (config_pid_file != NULL)
476 : LOCALSTATEDIR "/run/rrdcached.pid";
478 status = unlink (file);
482 } /* }}} int remove_pidfile */
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
488 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489 sock->next_read - sock->next_cmd);
493 /* no commands left, move remainder back to front of rbuf */
494 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495 sock->next_read - sock->next_cmd);
496 sock->next_read -= sock->next_cmd;
503 char *cmd = sock->rbuf + sock->next_cmd;
506 sock->next_cmd = eol - sock->rbuf + 1;
508 if (eol > sock->rbuf && *(eol-1) == '\r')
509 *(--eol) = '\0'; /* handle "\r\n" EOL */
518 } /* }}} char *next_cmd */
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
525 assert(sock != NULL);
527 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
530 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
534 strncpy(new_buf + sock->wbuf_len, str, len + 1);
536 sock->wbuf = new_buf;
537 sock->wbuf_len += len;
540 } /* }}} static int add_to_wbuf */
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
546 char buffer[CMD_MAX];
549 if (JOURNAL_REPLAY(sock)) return 0;
550 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
553 #ifdef HAVE_VSNPRINTF
554 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
556 len = vsprintf(buffer, fmt, argp);
561 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
565 return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
568 static int count_lines(char *str) /* {{{ */
574 while ((str = strchr(str, '\n')) != NULL)
582 } /* }}} static int count_lines */
584 /* send the response back to the user.
585 * returns 0 on success, -1 on error
586 * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588 char *fmt, ...) /* {{{ */
591 char buffer[CMD_MAX];
596 if (JOURNAL_REPLAY(sock)) return rc;
598 if (sock->batch_start)
601 return rc; /* no response on success during BATCH */
602 lines = sock->batch_cmd;
604 else if (rc == RESP_OK)
605 lines = count_lines(sock->wbuf);
609 rclen = sprintf(buffer, "%d ", lines);
611 #ifdef HAVE_VSNPRINTF
612 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
614 len = vsprintf(buffer+rclen, fmt, argp);
622 /* append the result to the wbuf, don't write to the user */
623 if (sock->batch_start)
624 return add_to_wbuf(sock, buffer, len);
626 /* first write must be complete */
627 if (len != write(sock->fd, buffer, len))
629 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
633 if (sock->wbuf != NULL && rc == RESP_OK)
636 while (wrote < sock->wbuf_len)
638 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
641 RRDD_LOG(LOG_INFO, "send_response: could not write results");
648 free(sock->wbuf); sock->wbuf = NULL;
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
658 ci->values_alloc = 0;
660 ci->last_flush_time = when;
661 if (config_write_jitter > 0)
662 ci->last_flush_time += (rrd_random() % config_write_jitter);
666 * remove a "cache_item_t" item from the queue.
667 * must hold 'cache_lock' when calling this
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
671 if (ci == NULL) return;
672 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
674 if (ci->prev == NULL)
675 cache_queue_head = ci->next; /* reset head */
677 ci->prev->next = ci->next;
679 if (ci->next == NULL)
680 cache_queue_tail = ci->prev; /* reset the tail */
682 ci->next->prev = ci->prev;
684 ci->next = ci->prev = NULL;
685 ci->flags &= ~CI_FLAGS_IN_QUEUE;
687 pthread_mutex_lock (&stats_lock);
688 assert (stats_queue_length > 0);
689 stats_queue_length--;
690 pthread_mutex_unlock (&stats_lock);
692 } /* }}} static void remove_from_queue */
694 /* free the resources associated with the cache_item_t
695 * must hold cache_lock when calling this function
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
699 if (ci == NULL) return NULL;
701 remove_from_queue(ci);
703 for (size_t i=0; i < ci->values_num; i++)
709 /* in case anyone is waiting */
710 pthread_cond_broadcast(&ci->flushed);
711 pthread_cond_destroy(&ci->flushed);
716 } /* }}} static void *free_cache_item */
719 * enqueue_cache_item:
720 * `cache_lock' must be acquired before calling this function!
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
728 if (ci->values_num == 0)
733 if (cache_queue_head == ci)
736 /* remove if further down in queue */
737 remove_from_queue(ci);
740 ci->next = cache_queue_head;
741 if (ci->next != NULL)
743 cache_queue_head = ci;
745 if (cache_queue_tail == NULL)
746 cache_queue_tail = cache_queue_head;
748 else /* (side == TAIL) */
750 /* We don't move values back in the list.. */
751 if (ci->flags & CI_FLAGS_IN_QUEUE)
754 assert (ci->next == NULL);
755 assert (ci->prev == NULL);
757 ci->prev = cache_queue_tail;
759 if (cache_queue_tail == NULL)
760 cache_queue_head = ci;
762 cache_queue_tail->next = ci;
764 cache_queue_tail = ci;
767 ci->flags |= CI_FLAGS_IN_QUEUE;
769 pthread_cond_signal(&queue_cond);
770 pthread_mutex_lock (&stats_lock);
771 stats_queue_length++;
772 pthread_mutex_unlock (&stats_lock);
775 } /* }}} int enqueue_cache_item */
778 * tree_callback_flush:
779 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780 * while this is in progress.
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
786 callback_flush_data_t *cfd;
788 ci = (cache_item_t *) value;
789 cfd = (callback_flush_data_t *) data;
791 if (ci->flags & CI_FLAGS_IN_QUEUE)
794 if (ci->values_num > 0
795 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
797 enqueue_cache_item (ci, TAIL);
799 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800 && (ci->values_num <= 0))
802 assert ((char *) key == ci->file);
803 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
805 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
811 } /* }}} gboolean tree_callback_flush */
813 static int flush_old_values (int max_age)
815 callback_flush_data_t cfd;
818 memset (&cfd, 0, sizeof (cfd));
819 /* Pass the current time as user data so that we don't need to call
820 * `time' for each node. */
821 cfd.now = time (NULL);
826 cfd.abs_timeout = cfd.now - max_age;
828 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
830 /* `tree_callback_flush' will return the keys of all values that haven't
831 * been touched in the last `config_flush_interval' seconds in `cfd'.
832 * The char*'s in this array point to the same memory as ci->file, so we
833 * don't need to free them separately. */
834 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
836 for (k = 0; k < cfd.keys_num; k++)
838 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839 /* should never fail, since we have held the cache_lock
841 assert(status == TRUE);
844 if (cfd.keys != NULL)
851 } /* int flush_old_values */
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
856 struct timespec next_flush;
859 gettimeofday (&now, NULL);
860 next_flush.tv_sec = now.tv_sec + config_flush_interval;
861 next_flush.tv_nsec = 1000 * now.tv_usec;
863 pthread_mutex_lock(&cache_lock);
865 while (state == RUNNING)
867 gettimeofday (&now, NULL);
868 if ((now.tv_sec > next_flush.tv_sec)
869 || ((now.tv_sec == next_flush.tv_sec)
870 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
872 RRDD_LOG(LOG_DEBUG, "flushing old values");
874 /* Determine the time of the next cache flush. */
875 next_flush.tv_sec = now.tv_sec + config_flush_interval;
877 /* Flush all values that haven't been written in the last
878 * `config_write_interval' seconds. */
879 flush_old_values (config_write_interval);
881 /* unlock the cache while we rotate so we don't block incoming
882 * updates if the fsync() blocks on disk I/O */
883 pthread_mutex_unlock(&cache_lock);
885 pthread_mutex_lock(&cache_lock);
888 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889 if (status != 0 && status != ETIMEDOUT)
891 RRDD_LOG (LOG_ERR, "flush_thread_main: "
892 "pthread_cond_timedwait returned %i.", status);
896 if (config_flush_at_shutdown)
897 flush_old_values (-1); /* flush everything */
901 pthread_mutex_unlock(&cache_lock);
904 } /* void *flush_thread_main */
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
908 pthread_mutex_lock (&cache_lock);
910 while (state != SHUTDOWN
911 || (cache_queue_head != NULL && config_flush_at_shutdown))
919 /* Now, check if there's something to store away. If not, wait until
920 * something comes in. */
921 if (cache_queue_head == NULL)
923 status = pthread_cond_wait (&queue_cond, &cache_lock);
924 if ((status != 0) && (status != ETIMEDOUT))
926 RRDD_LOG (LOG_ERR, "queue_thread_main: "
927 "pthread_cond_wait returned %i.", status);
931 /* Check if a value has arrived. This may be NULL if we timed out or there
932 * was an interrupt such as a signal. */
933 if (cache_queue_head == NULL)
936 ci = cache_queue_head;
938 /* copy the relevant parts */
939 file = strdup (ci->file);
942 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
946 assert(ci->values != NULL);
947 assert(ci->values_num > 0);
950 values_num = ci->values_num;
952 wipe_ci_values(ci, time(NULL));
953 remove_from_queue(ci);
955 pthread_mutex_unlock (&cache_lock);
958 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
961 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962 "rrd_update_r (%s) failed with status %i. (%s)",
963 file, status, rrd_get_error());
966 journal_write("wrote", file);
968 /* Search again in the tree. It's possible someone issued a "FORGET"
969 * while we were writing the update values. */
970 pthread_mutex_lock(&cache_lock);
971 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
973 pthread_cond_broadcast(&ci->flushed);
974 pthread_mutex_unlock(&cache_lock);
978 pthread_mutex_lock (&stats_lock);
979 stats_updates_written++;
980 stats_data_sets_written += values_num;
981 pthread_mutex_unlock (&stats_lock);
984 rrd_free_ptrs((void ***) &values, &values_num);
987 pthread_mutex_lock (&cache_lock);
989 pthread_mutex_unlock (&cache_lock);
992 } /* }}} void *queue_thread_main */
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995 size_t *buffer_size_ret, char **field_ret)
1004 buffer = *buffer_ret;
1006 buffer_size = *buffer_size_ret;
1007 field = *buffer_ret;
1010 if (buffer_size <= 0)
1013 /* This is ensured by `handle_request'. */
1014 assert (buffer[buffer_size - 1] == '\0');
1017 while (buffer_pos < buffer_size)
1019 /* Check for end-of-field or end-of-buffer */
1020 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1022 field[field_size] = 0;
1028 /* Handle escaped characters. */
1029 else if (buffer[buffer_pos] == '\\')
1031 if (buffer_pos >= (buffer_size - 1))
1034 field[field_size] = buffer[buffer_pos];
1038 /* Normal operation */
1041 field[field_size] = buffer[buffer_pos];
1045 } /* while (buffer_pos < buffer_size) */
1050 *buffer_ret = buffer + buffer_pos;
1051 *buffer_size_ret = buffer_size - buffer_pos;
1055 } /* }}} int buffer_get_field */
1057 /* if we're restricting writes to the base directory,
1058 * check whether the file falls within the dir
1059 * returns 1 if OK, otherwise 0
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1063 assert(file != NULL);
1065 if (!config_write_base_only
1066 || JOURNAL_REPLAY(sock)
1067 || config_base_dir == NULL)
1070 if (strstr(file, "../") != NULL) goto err;
1072 /* relative paths without "../" are ok */
1073 if (*file != '/') return 1;
1075 /* file must be of the format base + "/" + <1+ char filename> */
1076 if (strlen(file) < _config_base_dir_len + 2) goto err;
1077 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078 if (*(file + _config_base_dir_len) != '/') goto err;
1083 if (sock != NULL && sock->fd >= 0)
1084 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1087 } /* }}} static int check_file_access */
1089 /* when using a base dir, convert relative paths to absolute paths.
1090 * if necessary, modifies the "filename" pointer to point
1091 * to the new path created in "tmp". "tmp" is provided
1092 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1094 * this allows us to optimize for the expected case (absolute path)
1097 static void get_abs_path(char **filename, char *tmp)
1099 assert(tmp != NULL);
1100 assert(filename != NULL && *filename != NULL);
1102 if (config_base_dir == NULL || **filename == '/')
1105 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1107 } /* }}} static int get_abs_path */
1109 static int flush_file (const char *filename) /* {{{ */
1113 pthread_mutex_lock (&cache_lock);
1115 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1118 pthread_mutex_unlock (&cache_lock);
1122 if (ci->values_num > 0)
1124 /* Enqueue at head */
1125 enqueue_cache_item (ci, HEAD);
1126 pthread_cond_wait(&ci->flushed, &cache_lock);
1129 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1130 * may have been purged during our cond_wait() */
1132 pthread_mutex_unlock(&cache_lock);
1135 } /* }}} int flush_file */
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1139 char *err = "Syntax error.\n";
1141 if (cmd && cmd->syntax)
1144 return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1149 uint64_t copy_queue_length;
1150 uint64_t copy_updates_received;
1151 uint64_t copy_flush_received;
1152 uint64_t copy_updates_written;
1153 uint64_t copy_data_sets_written;
1154 uint64_t copy_journal_bytes;
1155 uint64_t copy_journal_rotate;
1157 uint64_t tree_nodes_number;
1158 uint64_t tree_depth;
1160 pthread_mutex_lock (&stats_lock);
1161 copy_queue_length = stats_queue_length;
1162 copy_updates_received = stats_updates_received;
1163 copy_flush_received = stats_flush_received;
1164 copy_updates_written = stats_updates_written;
1165 copy_data_sets_written = stats_data_sets_written;
1166 copy_journal_bytes = stats_journal_bytes;
1167 copy_journal_rotate = stats_journal_rotate;
1168 pthread_mutex_unlock (&stats_lock);
1170 pthread_mutex_lock (&cache_lock);
1171 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172 tree_depth = (uint64_t) g_tree_height (cache_tree);
1173 pthread_mutex_unlock (&cache_lock);
1175 add_response_info(sock,
1176 "QueueLength: %"PRIu64"\n", copy_queue_length);
1177 add_response_info(sock,
1178 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179 add_response_info(sock,
1180 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181 add_response_info(sock,
1182 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183 add_response_info(sock,
1184 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1190 send_response(sock, RESP_OK, "Statistics follow\n");
1193 } /* }}} int handle_request_stats */
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1197 char *file, file_tmp[PATH_MAX];
1200 status = buffer_get_field (&buffer, &buffer_size, &file);
1203 return syntax_error(sock,cmd);
1207 pthread_mutex_lock(&stats_lock);
1208 stats_flush_received++;
1209 pthread_mutex_unlock(&stats_lock);
1211 get_abs_path(&file, file_tmp);
1212 if (!check_file_access(file, sock)) return 0;
1214 status = flush_file (file);
1216 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217 else if (status == ENOENT)
1219 /* no file in our tree; see whether it exists at all */
1220 struct stat statbuf;
1222 memset(&statbuf, 0, sizeof(statbuf));
1223 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1226 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1228 else if (status < 0)
1229 return send_response(sock, RESP_ERR, "Internal error.\n");
1231 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1236 } /* }}} int handle_request_flush */
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1240 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1242 pthread_mutex_lock(&cache_lock);
1243 flush_old_values(-1);
1244 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1252 char *file, file_tmp[PATH_MAX];
1255 status = buffer_get_field(&buffer, &buffer_size, &file);
1257 return syntax_error(sock,cmd);
1259 get_abs_path(&file, file_tmp);
1261 pthread_mutex_lock(&cache_lock);
1262 ci = g_tree_lookup(cache_tree, file);
1265 pthread_mutex_unlock(&cache_lock);
1266 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1269 for (size_t i=0; i < ci->values_num; i++)
1270 add_response_info(sock, "%s\n", ci->values[i]);
1272 pthread_mutex_unlock(&cache_lock);
1273 return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1280 char *file, file_tmp[PATH_MAX];
1282 status = buffer_get_field(&buffer, &buffer_size, &file);
1284 return syntax_error(sock,cmd);
1286 get_abs_path(&file, file_tmp);
1287 if (!check_file_access(file, sock)) return 0;
1289 pthread_mutex_lock(&cache_lock);
1290 found = g_tree_remove(cache_tree, file);
1291 pthread_mutex_unlock(&cache_lock);
1295 if (!JOURNAL_REPLAY(sock))
1296 journal_write("forget", file);
1298 return send_response(sock, RESP_OK, "Gone!\n");
1301 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1305 } /* }}} static int handle_request_forget */
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1311 pthread_mutex_lock(&cache_lock);
1313 ci = cache_queue_head;
1316 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1320 pthread_mutex_unlock(&cache_lock);
1322 return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1327 char *file, file_tmp[PATH_MAX];
1330 char orig_buf[CMD_MAX];
1334 /* save it for the journal later */
1335 if (!JOURNAL_REPLAY(sock))
1336 strncpy(orig_buf, buffer, buffer_size);
1338 status = buffer_get_field (&buffer, &buffer_size, &file);
1340 return syntax_error(sock,cmd);
1342 pthread_mutex_lock(&stats_lock);
1343 stats_updates_received++;
1344 pthread_mutex_unlock(&stats_lock);
1346 get_abs_path(&file, file_tmp);
1347 if (!check_file_access(file, sock)) return 0;
1349 pthread_mutex_lock (&cache_lock);
1350 ci = g_tree_lookup (cache_tree, file);
1352 if (ci == NULL) /* {{{ */
1354 struct stat statbuf;
1357 /* don't hold the lock while we setup; stat(2) might block */
1358 pthread_mutex_unlock(&cache_lock);
1360 memset (&statbuf, 0, sizeof (statbuf));
1361 status = stat (file, &statbuf);
1364 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1367 if (status == ENOENT)
1368 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1370 return send_response(sock, RESP_ERR,
1371 "stat failed with error %i.\n", status);
1373 if (!S_ISREG (statbuf.st_mode))
1374 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1376 if (access(file, R_OK|W_OK) != 0)
1377 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378 file, rrd_strerror(errno));
1380 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1383 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1385 return send_response(sock, RESP_ERR, "malloc failed.\n");
1387 memset (ci, 0, sizeof (cache_item_t));
1389 ci->file = strdup (file);
1390 if (ci->file == NULL)
1393 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1395 return send_response(sock, RESP_ERR, "strdup failed.\n");
1398 wipe_ci_values(ci, now);
1399 ci->flags = CI_FLAGS_IN_TREE;
1400 pthread_cond_init(&ci->flushed, NULL);
1402 pthread_mutex_lock(&cache_lock);
1404 /* another UPDATE might have added this entry in the meantime */
1405 tmp = g_tree_lookup (cache_tree, file);
1407 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1410 free_cache_item (ci);
1414 /* state may have changed while we were unlocked */
1415 if (state == SHUTDOWN)
1418 assert (ci != NULL);
1420 /* don't re-write updates in replay mode */
1421 if (!JOURNAL_REPLAY(sock))
1422 journal_write("update", orig_buf);
1424 while (buffer_size > 0)
1430 status = buffer_get_field (&buffer, &buffer_size, &value);
1433 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1437 /* make sure update time is always moving forward */
1438 stamp = strtol(value, &eostamp, 10);
1439 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "Cannot find timestamp in '%s'!\n", value);
1445 else if (stamp <= ci->last_update_stamp)
1447 pthread_mutex_unlock(&cache_lock);
1448 return send_response(sock, RESP_ERR,
1449 "illegal attempt to update using time %ld when last"
1450 " update time is %ld (minimum one second step)\n",
1451 stamp, ci->last_update_stamp);
1454 ci->last_update_stamp = stamp;
1456 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457 &ci->values_alloc, config_alloc_chunk))
1459 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1466 if (((now - ci->last_flush_time) >= config_write_interval)
1467 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468 && (ci->values_num > 0))
1470 enqueue_cache_item (ci, TAIL);
1473 pthread_mutex_unlock (&cache_lock);
1476 return send_response(sock, RESP_ERR, "No values updated.\n");
1478 return send_response(sock, RESP_OK,
1479 "errors, enqueued %i value(s).\n", values_num);
1484 } /* }}} int handle_request_update */
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1488 char *file, file_tmp[PATH_MAX];
1497 unsigned long ds_cnt;
1504 rrd_value_t *data_ptr;
1511 /* Read the arguments */
1514 status = buffer_get_field (&buffer, &buffer_size, &file);
1518 status = buffer_get_field (&buffer, &buffer_size, &cf);
1522 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1530 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1540 return (syntax_error(sock,cmd));
1542 get_abs_path(&file, file_tmp);
1543 if (!check_file_access(file, sock)) return 0;
1545 status = flush_file (file);
1546 if ((status != 0) && (status != ENOENT))
1547 return (send_response (sock, RESP_ERR,
1548 "flush_file (%s) failed with status %i.\n", file, status));
1550 t = time (NULL); /* "now" */
1552 /* Parse start time */
1553 if (start_str != NULL)
1560 value = strtol (start_str, &endptr, /* base = */ 0);
1561 if ((endptr == start_str) || (errno != 0))
1562 return (send_response(sock, RESP_ERR,
1563 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1567 start_tm = (time_t) value;
1569 start_tm = (time_t) (t + value);
1573 start_tm = t - 86400;
1576 /* Parse end time */
1577 if (end_str != NULL)
1584 value = strtol (end_str, &endptr, /* base = */ 0);
1585 if ((endptr == end_str) || (errno != 0))
1586 return (send_response(sock, RESP_ERR,
1587 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1591 end_tm = (time_t) value;
1593 end_tm = (time_t) (t + value);
1605 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606 &ds_cnt, &ds_namv, &data);
1608 return (send_response(sock, RESP_ERR,
1609 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1611 add_response_info (sock, "FlushVersion: %lu\n", 1);
1612 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614 add_response_info (sock, "Step: %lu\n", step);
1615 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618 size_t str_len = strlen (str); \
1619 if ((buffer_fill + str_len) > sizeof (buffer)) \
1620 str_len = sizeof (buffer) - buffer_fill; \
1621 if (str_len > 0) { \
1622 strncpy (buffer + buffer_fill, str, str_len); \
1623 buffer_fill += str_len; \
1624 assert (buffer_fill <= sizeof (buffer)); \
1625 if (buffer_fill == sizeof (buffer)) \
1626 buffer[buffer_fill - 1] = 0; \
1628 buffer[buffer_fill] = 0; \
1632 { /* Add list of DS names */
1634 size_t linebuf_fill;
1636 memset (linebuf, 0, sizeof (linebuf));
1638 for (i = 0; i < ds_cnt; i++)
1641 SSTRCAT (linebuf, " ", linebuf_fill);
1642 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643 rrd_freemem(ds_namv[i]);
1645 rrd_freemem(ds_namv);
1646 add_response_info (sock, "DSName: %s\n", linebuf);
1649 /* Add the actual data */
1652 for (t = start_tm + step; t <= end_tm; t += step)
1655 size_t linebuf_fill;
1658 memset (linebuf, 0, sizeof (linebuf));
1660 for (i = 0; i < ds_cnt; i++)
1662 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663 tmp[sizeof (tmp) - 1] = 0;
1664 SSTRCAT (linebuf, tmp, linebuf_fill);
1669 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1673 return (send_response (sock, RESP_OK, "Success\n"));
1675 } /* }}} int handle_request_fetch */
1677 /* we came across a "WROTE" entry during journal replay.
1678 * throw away any values that we have accumulated for this file
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1683 const char *file = buffer;
1685 pthread_mutex_lock(&cache_lock);
1687 ci = g_tree_lookup(cache_tree, file);
1690 pthread_mutex_unlock(&cache_lock);
1695 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1697 wipe_ci_values(ci, now);
1698 remove_from_queue(ci);
1700 pthread_mutex_unlock(&cache_lock);
1702 } /* }}} int handle_request_wrote */
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1706 char *file, file_tmp[PATH_MAX];
1710 /* obtain filename */
1711 status = buffer_get_field(&buffer, &buffer_size, &file);
1713 return syntax_error(sock,cmd);
1714 /* get full pathname */
1715 get_abs_path(&file, file_tmp);
1716 if (!check_file_access(file, sock)) {
1717 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1721 info = rrd_info_r(file);
1723 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1725 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726 switch (data->type) {
1728 if (isnan(data->value.u_val))
1729 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1731 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1734 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1737 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1740 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1743 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1748 rrd_info_free(info);
1750 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info */
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1755 char *i, *file, file_tmp[PATH_MAX];
1760 /* obtain filename */
1761 status = buffer_get_field(&buffer, &buffer_size, &file);
1763 return syntax_error(sock,cmd);
1764 /* get full pathname */
1765 get_abs_path(&file, file_tmp);
1766 if (!check_file_access(file, sock)) {
1767 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1770 status = buffer_get_field(&buffer, &buffer_size, &i);
1772 return syntax_error(sock,cmd);
1775 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1780 t = rrd_first_r(file,idx);
1782 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1784 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first */
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1790 char *file, file_tmp[PATH_MAX];
1792 time_t t, from_file, step;
1793 rrd_file_t * rrd_file;
1797 /* obtain filename */
1798 status = buffer_get_field(&buffer, &buffer_size, &file);
1800 return syntax_error(sock,cmd);
1801 /* get full pathname */
1802 get_abs_path(&file, file_tmp);
1803 if (!check_file_access(file, sock)) {
1804 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1808 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1810 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1812 from_file = rrd.live_head->last_up;
1813 step = rrd.stat_head->pdp_step;
1814 rrd_close(rrd_file);
1815 pthread_mutex_lock(&cache_lock);
1816 ci = g_tree_lookup(cache_tree, file);
1818 t = ci->last_update_stamp;
1821 pthread_mutex_unlock(&cache_lock);
1825 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1827 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last */
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1832 char *file, file_tmp[PATH_MAX];
1837 unsigned long step = 300;
1838 time_t last_up = time(NULL)-10;
1839 int no_overwrite = opt_no_overwrite;
1842 /* obtain filename */
1843 status = buffer_get_field(&buffer, &buffer_size, &file);
1845 return syntax_error(sock,cmd);
1846 /* get full pathname */
1847 get_abs_path(&file, file_tmp);
1848 if (!check_file_access(file, sock)) {
1849 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1851 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1853 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854 if( ! strncmp(tok,"-b",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1857 last_up = (time_t) atol(tok);
1860 if( ! strncmp(tok,"-s",2) ) {
1861 status = buffer_get_field(&buffer, &buffer_size, &tok );
1862 if (status != 0) return syntax_error(sock,cmd);
1866 if( ! strncmp(tok,"-O",2) ) {
1870 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872 return syntax_error(sock,cmd);
1875 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1877 if (last_up < 3600 * 24 * 365 * 10) {
1878 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1882 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1885 return send_response(sock, RESP_OK, "RRD created OK\n");
1887 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create */
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1894 if (sock->batch_start)
1895 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1897 status = send_response(sock, RESP_OK,
1898 "Go ahead. End with dot '.' on its own line.\n");
1899 sock->batch_start = time(NULL);
1900 sock->batch_cmd = 0;
1903 } /* }}} static int batch_start */
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1908 assert(sock->batch_start);
1909 sock->batch_start = 0;
1910 sock->batch_cmd = 0;
1911 return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1917 } /* }}} static int handle_request_quit */
1919 static command_t list_of_commands[] = { /* {{{ */
1922 handle_request_update,
1924 "UPDATE <filename> <values> [<values> ...]\n"
1926 "Adds the given file to the internal cache if it is not yet known and\n"
1927 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1930 "Each <values> has the following form:\n"
1931 " <values> = <time>:<value>[:<value>[...]]\n"
1932 "See the rrdupdate(1) manpage for details.\n"
1936 handle_request_wrote,
1937 CMD_CONTEXT_JOURNAL,
1943 handle_request_flush,
1944 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945 "FLUSH <filename>\n"
1947 "Adds the given filename to the head of the update queue and returns\n"
1948 "after it has been dequeued.\n"
1952 handle_request_flushall,
1956 "Triggers writing of all pending updates. Returns immediately.\n"
1960 handle_request_pending,
1962 "PENDING <filename>\n"
1964 "Shows any 'pending' updates for a file, in order.\n"
1965 "The updates shown have not yet been written to the underlying RRD file.\n"
1969 handle_request_forget,
1971 "FORGET <filename>\n"
1973 "Removes the file completely from the cache.\n"
1974 "Any pending updates for the file will be lost.\n"
1978 handle_request_queue,
1982 "Shows all files in the output queue.\n"
1983 "The output is zero or more lines in the following format:\n"
1984 "(where <num_vals> is the number of values to be written)\n"
1986 "<num_vals> <filename>\n"
1990 handle_request_stats,
1994 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995 "a description of the values.\n"
1999 handle_request_help,
2001 "HELP [<command>]\n",
2002 NULL, /* special! */
2010 "The 'BATCH' command permits the client to initiate a bulk load\n"
2011 " of commands to rrdcached.\n"
2016 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2017 " client: command #1\n"
2018 " client: command #2\n"
2019 " client: ... and so on\n"
2021 " server: 2 errors\n"
2022 " server: 7 message for command #7\n"
2023 " server: 9 message for command #9\n"
2025 "For more information, consult the rrdcached(1) documentation.\n"
2028 ".", /* BATCH terminator */
2036 handle_request_fetch,
2038 "FETCH <file> <CF> [<start> [<end>]]\n"
2040 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2044 handle_request_info,
2046 "INFO <filename>\n",
2047 "The INFO command retrieves information about a specified RRD file.\n"
2048 "This is returned in standard rrdinfo format, a sequence of lines\n"
2049 "with the format <keyname> = <value>\n"
2050 "Note that this is the data as of the last update of the RRD file itself,\n"
2051 "not the last time data was received via rrdcached, so there may be pending\n"
2052 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2056 handle_request_first,
2058 "FIRST <filename> <rra index>\n",
2059 "The FIRST command retrieves the first data time for a specified RRA in\n"
2064 handle_request_last,
2066 "LAST <filename>\n",
2067 "The LAST command retrieves the last update time for a specified RRD file.\n"
2068 "Note that this is the time of the last update of the RRD file itself, not\n"
2069 "the last time data was received via rrdcached, so there may be pending\n"
2070 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2074 handle_request_create,
2075 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077 "The CREATE command will create an RRD file, overwriting any existing file\n"
2078 "unless the -O option is given or rrdcached was started with the -O option.\n"
2079 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080 "not acceptable) and the step is in seconds (default is 300).\n"
2081 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2085 handle_request_quit,
2086 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2089 "Disconnect from rrdcached.\n"
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093 / sizeof (list_of_commands[0]);
2095 static command_t *find_command(char *cmd)
2099 for (i = 0; i < list_of_commands_len; i++)
2100 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101 return (&list_of_commands[i]);
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107 * outside these functions so that switching to a more elegant storage method
2108 * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2113 for (i = 0; i < list_of_commands_len; i++)
2114 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115 return ((ssize_t) i);
2117 } /* }}} ssize_t find_command_index */
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2124 if (JOURNAL_REPLAY(sock))
2130 if ((strcasecmp ("QUIT", cmd) == 0)
2131 || (strcasecmp ("HELP", cmd) == 0))
2133 else if (strcmp (".", cmd) == 0)
2136 i = find_command_index (cmd);
2141 if ((sock->permissions & (1 << i)) != 0)
2144 } /* }}} int socket_permission_check */
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2151 i = find_command_index (cmd);
2156 sock->permissions |= (1 << i);
2158 } /* }}} int socket_permission_add */
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2162 sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166 listen_socket_t *src)
2168 dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2171 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2175 sock->permissions = 0;
2176 for (i = 0; i < list_of_commands_len; i++)
2177 sock->permissions |= (1 << i);
2178 } /* }}} void socket_permission_set_all */
2180 /* check whether commands are received in the expected context */
2181 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2183 if (JOURNAL_REPLAY(sock))
2184 return (cmd->context & CMD_CONTEXT_JOURNAL);
2185 else if (sock->batch_start)
2186 return (cmd->context & CMD_CONTEXT_BATCH);
2188 return (cmd->context & CMD_CONTEXT_CLIENT);
2194 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2199 command_t *help = NULL;
2201 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2203 help = find_command(cmd_str);
2205 if (help && (help->syntax || help->help))
2209 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2213 add_response_info(sock, "Usage: %s\n", help->syntax);
2216 add_response_info(sock, "%s\n", help->help);
2222 resp_txt = "Command overview\n";
2224 for (i = 0; i < list_of_commands_len; i++)
2226 if (list_of_commands[i].syntax == NULL)
2228 add_response_info (sock, "%s", list_of_commands[i].syntax);
2232 return send_response(sock, RESP_OK, resp_txt);
2233 } /* }}} int handle_request_help */
2235 static int handle_request (DISPATCH_PROTO) /* {{{ */
2237 char *buffer_ptr = buffer;
2238 char *cmd_str = NULL;
2239 command_t *cmd = NULL;
2242 assert (buffer[buffer_size - 1] == '\0');
2244 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2247 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2251 if (sock != NULL && sock->batch_start)
2254 cmd = find_command(cmd_str);
2256 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2258 if (!socket_permission_check (sock, cmd->cmd))
2259 return send_response(sock, RESP_ERR, "Permission denied.\n");
2261 if (!command_check_context(sock, cmd))
2262 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2264 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2265 } /* }}} int handle_request */
2267 static void journal_set_free (journal_set *js) /* {{{ */
2272 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2275 } /* }}} journal_set_free */
2277 static void journal_set_remove (journal_set *js) /* {{{ */
2282 for (uint i=0; i < js->files_num; i++)
2284 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2285 unlink(js->files[i]);
2287 } /* }}} journal_set_remove */
2289 /* close current journal file handle.
2290 * MUST hold journal_lock before calling */
2291 static void journal_close(void) /* {{{ */
2293 if (journal_fh != NULL)
2295 if (fclose(journal_fh) != 0)
2296 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2301 } /* }}} journal_close */
2303 /* MUST hold journal_lock before calling */
2304 static void journal_new_file(void) /* {{{ */
2308 char new_file[PATH_MAX + 1];
2310 assert(journal_dir != NULL);
2311 assert(journal_cur != NULL);
2315 gettimeofday(&now, NULL);
2316 /* this format assures that the files sort in strcmp() order */
2317 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2318 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2320 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2321 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2325 journal_fh = fdopen(new_fd, "a");
2326 if (journal_fh == NULL)
2329 journal_size = ftell(journal_fh);
2330 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2332 /* record the file in the journal set */
2333 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2339 "JOURNALING DISABLED: Error while trying to create %s : %s",
2340 new_file, rrd_strerror(errno));
2342 "JOURNALING DISABLED: All values will be flushed at shutdown");
2345 config_flush_at_shutdown = 1;
2347 } /* }}} journal_new_file */
2349 /* MUST NOT hold journal_lock before calling this */
2350 static void journal_rotate(void) /* {{{ */
2352 journal_set *old_js = NULL;
2354 if (journal_dir == NULL)
2357 RRDD_LOG(LOG_DEBUG, "rotating journals");
2359 pthread_mutex_lock(&stats_lock);
2360 ++stats_journal_rotate;
2361 pthread_mutex_unlock(&stats_lock);
2363 pthread_mutex_lock(&journal_lock);
2367 /* rotate the journal sets */
2368 old_js = journal_old;
2369 journal_old = journal_cur;
2370 journal_cur = calloc(1, sizeof(journal_set));
2372 if (journal_cur != NULL)
2375 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2377 pthread_mutex_unlock(&journal_lock);
2379 journal_set_remove(old_js);
2380 journal_set_free (old_js);
2382 } /* }}} static void journal_rotate */
2384 /* MUST hold journal_lock when calling */
2385 static void journal_done(void) /* {{{ */
2387 if (journal_cur == NULL)
2392 if (config_flush_at_shutdown)
2394 RRDD_LOG(LOG_INFO, "removing journals");
2395 journal_set_remove(journal_old);
2396 journal_set_remove(journal_cur);
2400 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2401 "journals will be used at next startup");
2404 journal_set_free(journal_cur);
2405 journal_set_free(journal_old);
2408 } /* }}} static void journal_done */
2410 static int journal_write(char *cmd, char *args) /* {{{ */
2414 if (journal_fh == NULL)
2417 pthread_mutex_lock(&journal_lock);
2418 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2419 journal_size += chars;
2421 if (journal_size > JOURNAL_MAX)
2424 pthread_mutex_unlock(&journal_lock);
2428 pthread_mutex_lock(&stats_lock);
2429 stats_journal_bytes += chars;
2430 pthread_mutex_unlock(&stats_lock);
2434 } /* }}} static int journal_write */
2436 static int journal_replay (const char *file) /* {{{ */
2442 char entry[CMD_MAX];
2445 if (file == NULL) return 0;
2448 char *reason = "unknown error";
2450 struct stat statbuf;
2452 memset(&statbuf, 0, sizeof(statbuf));
2453 if (stat(file, &statbuf) != 0)
2455 reason = "stat error";
2458 else if (!S_ISREG(statbuf.st_mode))
2460 reason = "not a regular file";
2463 if (statbuf.st_uid != daemon_uid)
2465 reason = "not owned by daemon user";
2468 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2470 reason = "must not be user/group writable";
2476 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2477 file, rrd_strerror(status), reason);
2482 fh = fopen(file, "r");
2485 if (errno != ENOENT)
2486 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2487 file, rrd_strerror(errno));
2491 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2500 if (fgets(entry, sizeof(entry), fh) == NULL)
2502 entry_len = strlen(entry);
2504 /* check \n termination in case journal writing crashed mid-line */
2507 else if (entry[entry_len - 1] != '\n')
2509 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2514 entry[entry_len - 1] = '\0';
2516 if (handle_request(NULL, now, entry, entry_len) == 0)
2524 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2525 entry_cnt, fail_cnt);
2527 return entry_cnt > 0 ? 1 : 0;
2528 } /* }}} static int journal_replay */
2530 static int journal_sort(const void *v1, const void *v2)
2532 char **jn1 = (char **) v1;
2533 char **jn2 = (char **) v2;
2535 return strcmp(*jn1,*jn2);
2538 static void journal_init(void) /* {{{ */
2540 int had_journal = 0;
2542 struct dirent *dent;
2543 char path[PATH_MAX+1];
2545 if (journal_dir == NULL) return;
2547 pthread_mutex_lock(&journal_lock);
2549 journal_cur = calloc(1, sizeof(journal_set));
2550 if (journal_cur == NULL)
2552 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2556 RRDD_LOG(LOG_INFO, "checking for journal files");
2558 /* Handle old journal files during transition. This gives them the
2559 * correct sort order. TODO: remove after first release
2562 char old_path[PATH_MAX+1];
2563 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2564 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2565 rename(old_path, path);
2567 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2568 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2569 rename(old_path, path);
2572 dir = opendir(journal_dir);
2574 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2577 while ((dent = readdir(dir)) != NULL)
2579 /* looks like a journal file? */
2580 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2583 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2585 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2587 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2594 qsort(journal_cur->files, journal_cur->files_num,
2595 sizeof(journal_cur->files[0]), journal_sort);
2597 for (uint i=0; i < journal_cur->files_num; i++)
2598 had_journal += journal_replay(journal_cur->files[i]);
2602 /* it must have been a crash. start a flush */
2603 if (had_journal && config_flush_at_shutdown)
2604 flush_old_values(-1);
2606 pthread_mutex_unlock(&journal_lock);
2608 RRDD_LOG(LOG_INFO, "journal processing complete");
2610 } /* }}} static void journal_init */
2612 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2614 assert(sock != NULL);
2616 free(sock->rbuf); sock->rbuf = NULL;
2617 free(sock->wbuf); sock->wbuf = NULL;
2619 } /* }}} void free_listen_socket */
2621 static void close_connection(listen_socket_t *sock) /* {{{ */
2629 free_listen_socket(sock);
2631 } /* }}} void close_connection */
2633 static void *connection_thread_main (void *args) /* {{{ */
2635 listen_socket_t *sock;
2638 sock = (listen_socket_t *) args;
2641 /* init read buffers */
2642 sock->next_read = sock->next_cmd = 0;
2643 sock->rbuf = malloc(RBUF_SIZE);
2644 if (sock->rbuf == NULL)
2646 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2647 close_connection(sock);
2651 pthread_mutex_lock (&connection_threads_lock);
2653 /* LIBWRAP does not support multiple threads! By putting this code
2654 inside pthread_mutex_lock we do not have to worry about request_info
2655 getting overwritten by another thread.
2657 struct request_info req;
2658 request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2660 if(!hosts_access(&req)) {
2661 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2662 pthread_mutex_unlock (&connection_threads_lock);
2663 close_connection(sock);
2666 #endif /* HAVE_LIBWRAP */
2667 connection_threads_num++;
2668 pthread_mutex_unlock (&connection_threads_lock);
2670 while (state == RUNNING)
2677 struct pollfd pollfd;
2681 pollfd.events = POLLIN | POLLPRI;
2684 status = poll (&pollfd, 1, /* timeout = */ 500);
2685 if (state != RUNNING)
2687 else if (status == 0) /* timeout */
2689 else if (status < 0) /* error */
2692 if (status != EINTR)
2693 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2697 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2699 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2701 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2702 "poll(2) returned something unexpected: %#04hx",
2707 rbytes = read(fd, sock->rbuf + sock->next_read,
2708 RBUF_SIZE - sock->next_read);
2711 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2714 else if (rbytes == 0)
2717 sock->next_read += rbytes;
2719 if (sock->batch_start)
2720 now = sock->batch_start;
2724 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2726 status = handle_request (sock, now, cmd, cmd_len+1);
2733 close_connection(sock);
2735 /* Remove this thread from the connection threads list */
2736 pthread_mutex_lock (&connection_threads_lock);
2737 connection_threads_num--;
2738 if (connection_threads_num <= 0)
2739 pthread_cond_broadcast(&connection_threads_done);
2740 pthread_mutex_unlock (&connection_threads_lock);
2743 } /* }}} void *connection_thread_main */
2745 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2748 struct sockaddr_un sa;
2749 listen_socket_t *temp;
2752 char *path_copy, *dir;
2755 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2756 path += strlen("unix:");
2758 /* dirname may modify its argument */
2759 path_copy = strdup(path);
2760 if (path_copy == NULL)
2762 fprintf(stderr, "rrdcached: strdup(): %s\n",
2763 rrd_strerror(errno));
2767 dir = dirname(path_copy);
2768 if (rrd_mkdir_p(dir, 0777) != 0)
2770 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2771 dir, rrd_strerror(errno));
2777 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2778 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2781 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2785 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2787 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2790 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2791 rrd_strerror(errno));
2795 memset (&sa, 0, sizeof (sa));
2796 sa.sun_family = AF_UNIX;
2797 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2799 /* if we've gotten this far, we own the pid file. any daemon started
2800 * with the same args must not be alive. therefore, ensure that we can
2801 * create the socket...
2805 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2808 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2809 path, rrd_strerror(errno));
2814 /* tweak the sockets group ownership */
2815 if (sock->socket_group != (gid_t)-1)
2817 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2818 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2820 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2824 if (sock->socket_permissions != (mode_t)-1)
2826 if (chmod(path, sock->socket_permissions) != 0)
2827 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2828 (unsigned int)sock->socket_permissions, strerror(errno));
2831 status = listen (fd, /* backlog = */ 10);
2834 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2835 path, rrd_strerror(errno));
2841 listen_fds[listen_fds_num].fd = fd;
2842 listen_fds[listen_fds_num].family = PF_UNIX;
2843 strncpy(listen_fds[listen_fds_num].addr, path,
2844 sizeof (listen_fds[listen_fds_num].addr) - 1);
2848 } /* }}} int open_listen_socket_unix */
2850 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2852 struct addrinfo ai_hints;
2853 struct addrinfo *ai_res;
2854 struct addrinfo *ai_ptr;
2855 char addr_copy[NI_MAXHOST];
2860 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2861 addr_copy[sizeof (addr_copy) - 1] = 0;
2864 memset (&ai_hints, 0, sizeof (ai_hints));
2865 ai_hints.ai_flags = 0;
2866 #ifdef AI_ADDRCONFIG
2867 ai_hints.ai_flags |= AI_ADDRCONFIG;
2869 ai_hints.ai_family = AF_UNSPEC;
2870 ai_hints.ai_socktype = SOCK_STREAM;
2873 if (*addr == '[') /* IPv6+port format */
2875 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2878 port = strchr (addr, ']');
2881 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2889 else if (*port == 0)
2893 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2896 } /* if (*addr == '[') */
2899 port = rindex(addr, ':');
2907 status = getaddrinfo (addr,
2908 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2909 &ai_hints, &ai_res);
2912 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2913 addr, gai_strerror (status));
2917 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2920 listen_socket_t *temp;
2923 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2924 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2928 "rrdcached: open_listen_socket_network: realloc failed.\n");
2932 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2934 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2937 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2938 rrd_strerror(errno));
2942 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2944 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2947 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2948 sock->addr, rrd_strerror(errno));
2953 status = listen (fd, /* backlog = */ 10);
2956 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2957 sock->addr, rrd_strerror(errno));
2959 freeaddrinfo(ai_res);
2963 listen_fds[listen_fds_num].fd = fd;
2964 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2966 } /* for (ai_ptr) */
2968 freeaddrinfo(ai_res);
2970 } /* }}} static int open_listen_socket_network */
2972 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2974 assert(sock != NULL);
2975 assert(sock->addr != NULL);
2977 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2978 || sock->addr[0] == '/')
2979 return (open_listen_socket_unix(sock));
2981 return (open_listen_socket_network(sock));
2982 } /* }}} int open_listen_socket */
2984 static int close_listen_sockets (void) /* {{{ */
2988 for (i = 0; i < listen_fds_num; i++)
2990 close (listen_fds[i].fd);
2992 if (listen_fds[i].family == PF_UNIX)
2993 unlink(listen_fds[i].addr);
3001 } /* }}} int close_listen_sockets */
3003 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3005 struct pollfd *pollfds;
3010 if (listen_fds_num < 1)
3012 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3016 pollfds_num = listen_fds_num;
3017 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3018 if (pollfds == NULL)
3020 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3023 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3025 RRDD_LOG(LOG_INFO, "listening for connections");
3027 while (state == RUNNING)
3029 for (i = 0; i < pollfds_num; i++)
3031 pollfds[i].fd = listen_fds[i].fd;
3032 pollfds[i].events = POLLIN | POLLPRI;
3033 pollfds[i].revents = 0;
3036 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3037 if (state != RUNNING)
3039 else if (status == 0) /* timeout */
3041 else if (status < 0) /* error */
3044 if (status != EINTR)
3046 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3051 for (i = 0; i < pollfds_num; i++)
3053 listen_socket_t *client_sock;
3054 struct sockaddr_storage client_sa;
3055 socklen_t client_sa_size;
3057 pthread_attr_t attr;
3059 if (pollfds[i].revents == 0)
3062 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3064 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3065 "poll(2) returned something unexpected for listen FD #%i.",
3070 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3071 if (client_sock == NULL)
3073 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3076 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3078 client_sa_size = sizeof (client_sa);
3079 client_sock->fd = accept (pollfds[i].fd,
3080 (struct sockaddr *) &client_sa, &client_sa_size);
3081 if (client_sock->fd < 0)
3083 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3088 pthread_attr_init (&attr);
3089 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3091 status = pthread_create (&tid, &attr, connection_thread_main,
3095 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3096 close_connection(client_sock);
3099 } /* for (pollfds_num) */
3100 } /* while (state == RUNNING) */
3102 RRDD_LOG(LOG_INFO, "starting shutdown");
3104 close_listen_sockets ();
3106 pthread_mutex_lock (&connection_threads_lock);
3107 while (connection_threads_num > 0)
3108 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3109 pthread_mutex_unlock (&connection_threads_lock);
3114 } /* }}} void *listen_thread_main */
3116 static int daemonize (void) /* {{{ */
3121 daemon_uid = geteuid();
3123 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3125 pid_fd = check_pidfile();
3129 /* open all the listen sockets */
3130 if (config_listen_address_list_len > 0)
3132 for (size_t i = 0; i < config_listen_address_list_len; i++)
3133 open_listen_socket (config_listen_address_list[i]);
3135 rrd_free_ptrs((void ***) &config_listen_address_list,
3136 &config_listen_address_list_len);
3140 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3141 sizeof(default_socket.addr) - 1);
3142 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3144 if (default_socket.permissions == 0)
3145 socket_permission_set_all (&default_socket);
3147 open_listen_socket (&default_socket);
3150 if (listen_fds_num < 1)
3152 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3156 if (!stay_foreground)
3163 fprintf (stderr, "daemonize: fork(2) failed.\n");
3169 /* Become session leader */
3172 /* Open the first three file descriptors to /dev/null */
3177 open ("/dev/null", O_RDWR);
3178 if (dup(0) == -1 || dup(0) == -1){
3179 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3181 } /* if (!stay_foreground) */
3183 /* Change into the /tmp directory. */
3184 base_dir = (config_base_dir != NULL)
3188 if (chdir (base_dir) != 0)
3190 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3194 install_signal_handlers();
3196 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3197 RRDD_LOG(LOG_INFO, "starting up");
3199 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3200 (GDestroyNotify) free_cache_item);
3201 if (cache_tree == NULL)
3203 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3207 return write_pidfile (pid_fd);
3212 } /* }}} int daemonize */
3214 static int cleanup (void) /* {{{ */
3216 pthread_cond_broadcast (&flush_cond);
3217 pthread_join (flush_thread, NULL);
3219 pthread_cond_broadcast (&queue_cond);
3220 for (int i = 0; i < config_queue_threads; i++)
3221 pthread_join (queue_threads[i], NULL);
3223 if (config_flush_at_shutdown)
3225 assert(cache_queue_head == NULL);
3226 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3229 free(queue_threads);
3230 free(config_base_dir);
3232 pthread_mutex_lock(&cache_lock);
3233 g_tree_destroy(cache_tree);
3235 pthread_mutex_lock(&journal_lock);
3238 RRDD_LOG(LOG_INFO, "goodbye");
3242 free(config_pid_file);
3245 } /* }}} int cleanup */
3247 static int read_options (int argc, char **argv) /* {{{ */
3252 socket_permission_clear (&default_socket);
3254 default_socket.socket_group = (gid_t)-1;
3255 default_socket.socket_permissions = (mode_t)-1;
3257 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3262 opt_no_overwrite = 1;
3271 listen_socket_t *new;
3273 new = malloc(sizeof(listen_socket_t));
3276 fprintf(stderr, "read_options: malloc failed.\n");
3279 memset(new, 0, sizeof(listen_socket_t));
3281 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3283 /* Add permissions to the socket {{{ */
3284 if (default_socket.permissions != 0)
3286 socket_permission_copy (new, &default_socket);
3288 else /* if (default_socket.permissions == 0) */
3290 /* Add permission for ALL commands to the socket. */
3291 socket_permission_set_all (new);
3293 /* }}} Done adding permissions. */
3295 new->socket_group = default_socket.socket_group;
3296 new->socket_permissions = default_socket.socket_permissions;
3298 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3299 &config_listen_address_list_len, new))
3301 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3307 /* set socket group permissions */
3313 group_gid = strtoul(optarg, NULL, 10);
3314 if (errno != EINVAL && group_gid>0)
3316 /* we were passed a number */
3317 grp = getgrgid(group_gid);
3321 grp = getgrnam(optarg);
3326 default_socket.socket_group = grp->gr_gid;
3330 /* no idea what the user wanted... */
3331 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3337 /* set socket file permissions */
3341 char *endptr = NULL;
3343 tmp = strtol (optarg, &endptr, 8);
3344 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3345 || (tmp > 07777) || (tmp < 0)) {
3346 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3351 default_socket.socket_permissions = (mode_t)tmp;
3362 socket_permission_clear (&default_socket);
3364 optcopy = strdup (optarg);
3367 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3370 status = socket_permission_add (&default_socket, ptr);
3373 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3374 "socket failed. Most likely, this permission doesn't "
3375 "exist. Check your command line.\n", ptr);
3388 temp = atoi (optarg);
3390 config_flush_interval = temp;
3393 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3403 temp = atoi (optarg);
3405 config_write_interval = temp;
3408 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3418 temp = atoi(optarg);
3420 config_write_jitter = temp;
3423 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3433 threads = atoi(optarg);
3435 config_queue_threads = threads;
3438 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3445 config_write_base_only = 1;
3451 char base_realpath[PATH_MAX];
3453 if (config_base_dir != NULL)
3454 free (config_base_dir);
3455 config_base_dir = strdup (optarg);
3456 if (config_base_dir == NULL)
3458 fprintf (stderr, "read_options: strdup failed.\n");
3462 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3464 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3465 config_base_dir, rrd_strerror (errno));
3469 /* make sure that the base directory is not resolved via
3470 * symbolic links. this makes some performance-enhancing
3471 * assumptions possible (we don't have to resolve paths
3472 * that start with a "/")
3474 if (realpath(config_base_dir, base_realpath) == NULL)
3476 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3477 "%s\n", config_base_dir, rrd_strerror(errno));
3481 len = strlen (config_base_dir);
3482 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3484 config_base_dir[len - 1] = 0;
3490 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3494 _config_base_dir_len = len;
3496 len = strlen (base_realpath);
3497 while ((len > 0) && (base_realpath[len - 1] == '/'))
3499 base_realpath[len - 1] = '\0';
3503 if (strncmp(config_base_dir,
3504 base_realpath, sizeof(base_realpath)) != 0)
3507 "Base directory (-b) resolved via file system links!\n"
3508 "Please consult rrdcached '-b' documentation!\n"
3509 "Consider specifying the real directory (%s)\n",
3518 if (config_pid_file != NULL)
3519 free (config_pid_file);
3520 config_pid_file = strdup (optarg);
3521 if (config_pid_file == NULL)
3523 fprintf (stderr, "read_options: strdup failed.\n");
3530 config_flush_at_shutdown = 1;
3535 char journal_dir_actual[PATH_MAX];
3536 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3539 // if we were able to properly resolve the path, lets have a copy
3540 // for use outside this block.
3541 journal_dir = strdup(journal_dir);
3542 status = rrd_mkdir_p(journal_dir, 0777);
3545 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3546 journal_dir, rrd_strerror(errno));
3549 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3551 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3552 errno ? rrd_strerror(errno) : "");
3556 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3557 errno ? rrd_strerror(errno) : "");
3565 int temp = atoi(optarg);
3567 config_alloc_chunk = temp;
3570 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3578 printf ("RRDCacheD %s\n"
3579 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3581 "Usage: rrdcached [options]\n"
3583 "Valid options are:\n"
3584 " -l <address> Socket address to listen to.\n"
3585 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3586 " -P <perms> Sets the permissions to assign to all following "
3588 " -w <seconds> Interval in which to write data.\n"
3589 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3590 " -t <threads> Number of write threads.\n"
3591 " -f <seconds> Interval in which to flush dead data.\n"
3592 " -p <file> Location of the PID-file.\n"
3593 " -b <dir> Base directory to change to.\n"
3594 " -B Restrict file access to paths within -b <dir>\n"
3595 " -g Do not fork and run in the foreground.\n"
3596 " -j <dir> Directory in which to create the journal files.\n"
3597 " -F Always flush all updates at shutdown\n"
3598 " -s <id|name> Group owner of all following UNIX sockets\n"
3599 " (the socket will also have read/write permissions "
3601 " -m <mode> File permissions (octal) of all following UNIX "
3603 " -a <size> Memory allocation chunk size. Default is 1.\n"
3604 " -O Do not allow CREATE commands to overwrite existing\n"
3605 " files, even if asked to.\n"
3607 "For more information and a detailed description of all options "
3609 "to the rrdcached(1) manual page.\n",
3616 } /* switch (option) */
3617 } /* while (getopt) */
3619 /* advise the user when values are not sane */
3620 if (config_flush_interval < 2 * config_write_interval)
3621 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3622 " 2x write interval (-w) !\n");
3623 if (config_write_jitter > config_write_interval)
3624 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3625 " write interval (-w) !\n");
3627 if (config_write_base_only && config_base_dir == NULL)
3628 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3629 " Consult the rrdcached documentation\n");
3631 if (journal_dir == NULL)
3632 config_flush_at_shutdown = 1;
3635 } /* }}} int read_options */
3637 int main (int argc, char **argv)
3641 status = read_options (argc, argv);
3649 status = daemonize ();
3652 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3658 /* start the queue threads */
3659 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3660 if (queue_threads == NULL)
3662 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3666 for (int i = 0; i < config_queue_threads; i++)
3668 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3669 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3672 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3678 /* start the flush thread */
3679 memset(&flush_thread, 0, sizeof(flush_thread));
3680 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3683 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3688 listen_thread_main (NULL);
3695 * vim: set sw=2 sts=2 ts=8 et fdm=marker :