2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
70 #include "../rrd_config.h"
75 #include "rrd_client.h"
87 #include <sys/socket.h>
95 #include <sys/types.h>
107 #include <sys/time.h>
112 #include <glib-2.0/glib.h>
115 #define RRDD_LOG(severity, ...) \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
131 char addr[PATH_MAX + 1];
134 /* state for BATCH processing */
146 uint32_t permissions;
149 mode_t socket_permissions;
151 typedef struct listen_socket_s listen_socket_t;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
179 typedef struct cache_item_s cache_item_t;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
191 pthread_cond_t flushed;
196 struct callback_flush_data_s
203 typedef struct callback_flush_data_s callback_flush_data_t;
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
218 /* max length of socket command or response */
220 #define RBUF_SIZE (CMD_MAX*2)
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 static listen_socket_t default_socket;
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int opt_no_overwrite = 0; /* default for the daemon */
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
300 static void sig_common (const char *sig) /* {{{ */
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
320 config_flush_at_shutdown = 1;
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
326 config_flush_at_shutdown = 0;
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
332 /* These structures are static, because `sigaction' behaves weird if the are
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
407 pid_fd = open_pidfile("open", O_RDWR);
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
451 fh = fdopen (fd, "w");
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459 fprintf (fh, "%i\n", (int) pid);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
470 file = (config_pid_file != NULL)
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
499 char *cmd = sock->rbuf + sock->next_cmd;
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 char buffer[CMD_MAX];
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
552 len = vsprintf(buffer, fmt, argp);
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
570 while ((str = strchr(str, '\n')) != NULL)
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
587 char buffer[CMD_MAX];
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
605 rclen = sprintf(buffer, "%d ", lines);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
610 len = vsprintf(buffer+rclen, fmt, argp);
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
629 if (sock->wbuf != NULL && rc == RESP_OK)
632 while (wrote < sock->wbuf_len)
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
644 free(sock->wbuf); sock->wbuf = NULL;
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
654 ci->values_alloc = 0;
656 ci->last_flush_time = when;
657 if (config_write_jitter > 0)
658 ci->last_flush_time += (rrd_random() % config_write_jitter);
662 * remove a "cache_item_t" item from the queue.
663 * must hold 'cache_lock' when calling this
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
667 if (ci == NULL) return;
668 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
670 if (ci->prev == NULL)
671 cache_queue_head = ci->next; /* reset head */
673 ci->prev->next = ci->next;
675 if (ci->next == NULL)
676 cache_queue_tail = ci->prev; /* reset the tail */
678 ci->next->prev = ci->prev;
680 ci->next = ci->prev = NULL;
681 ci->flags &= ~CI_FLAGS_IN_QUEUE;
683 pthread_mutex_lock (&stats_lock);
684 assert (stats_queue_length > 0);
685 stats_queue_length--;
686 pthread_mutex_unlock (&stats_lock);
688 } /* }}} static void remove_from_queue */
690 /* free the resources associated with the cache_item_t
691 * must hold cache_lock when calling this function
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
695 if (ci == NULL) return NULL;
697 remove_from_queue(ci);
699 for (size_t i=0; i < ci->values_num; i++)
705 /* in case anyone is waiting */
706 pthread_cond_broadcast(&ci->flushed);
707 pthread_cond_destroy(&ci->flushed);
712 } /* }}} static void *free_cache_item */
715 * enqueue_cache_item:
716 * `cache_lock' must be acquired before calling this function!
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
724 if (ci->values_num == 0)
729 if (cache_queue_head == ci)
732 /* remove if further down in queue */
733 remove_from_queue(ci);
736 ci->next = cache_queue_head;
737 if (ci->next != NULL)
739 cache_queue_head = ci;
741 if (cache_queue_tail == NULL)
742 cache_queue_tail = cache_queue_head;
744 else /* (side == TAIL) */
746 /* We don't move values back in the list.. */
747 if (ci->flags & CI_FLAGS_IN_QUEUE)
750 assert (ci->next == NULL);
751 assert (ci->prev == NULL);
753 ci->prev = cache_queue_tail;
755 if (cache_queue_tail == NULL)
756 cache_queue_head = ci;
758 cache_queue_tail->next = ci;
760 cache_queue_tail = ci;
763 ci->flags |= CI_FLAGS_IN_QUEUE;
765 pthread_cond_signal(&queue_cond);
766 pthread_mutex_lock (&stats_lock);
767 stats_queue_length++;
768 pthread_mutex_unlock (&stats_lock);
771 } /* }}} int enqueue_cache_item */
774 * tree_callback_flush:
775 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776 * while this is in progress.
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
782 callback_flush_data_t *cfd;
784 ci = (cache_item_t *) value;
785 cfd = (callback_flush_data_t *) data;
787 if (ci->flags & CI_FLAGS_IN_QUEUE)
790 if (ci->values_num > 0
791 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
793 enqueue_cache_item (ci, TAIL);
795 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796 && (ci->values_num <= 0))
798 assert ((char *) key == ci->file);
799 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
801 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
807 } /* }}} gboolean tree_callback_flush */
809 static int flush_old_values (int max_age)
811 callback_flush_data_t cfd;
814 memset (&cfd, 0, sizeof (cfd));
815 /* Pass the current time as user data so that we don't need to call
816 * `time' for each node. */
817 cfd.now = time (NULL);
822 cfd.abs_timeout = cfd.now - max_age;
824 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
826 /* `tree_callback_flush' will return the keys of all values that haven't
827 * been touched in the last `config_flush_interval' seconds in `cfd'.
828 * The char*'s in this array point to the same memory as ci->file, so we
829 * don't need to free them separately. */
830 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
832 for (k = 0; k < cfd.keys_num; k++)
834 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835 /* should never fail, since we have held the cache_lock
837 assert(status == TRUE);
840 if (cfd.keys != NULL)
847 } /* int flush_old_values */
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 struct timespec next_flush;
855 gettimeofday (&now, NULL);
856 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 next_flush.tv_nsec = 1000 * now.tv_usec;
859 pthread_mutex_lock(&cache_lock);
861 while (state == RUNNING)
863 gettimeofday (&now, NULL);
864 if ((now.tv_sec > next_flush.tv_sec)
865 || ((now.tv_sec == next_flush.tv_sec)
866 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
868 RRDD_LOG(LOG_DEBUG, "flushing old values");
870 /* Determine the time of the next cache flush. */
871 next_flush.tv_sec = now.tv_sec + config_flush_interval;
873 /* Flush all values that haven't been written in the last
874 * `config_write_interval' seconds. */
875 flush_old_values (config_write_interval);
877 /* unlock the cache while we rotate so we don't block incoming
878 * updates if the fsync() blocks on disk I/O */
879 pthread_mutex_unlock(&cache_lock);
881 pthread_mutex_lock(&cache_lock);
884 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885 if (status != 0 && status != ETIMEDOUT)
887 RRDD_LOG (LOG_ERR, "flush_thread_main: "
888 "pthread_cond_timedwait returned %i.", status);
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
897 pthread_mutex_unlock(&cache_lock);
900 } /* void *flush_thread_main */
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
904 pthread_mutex_lock (&cache_lock);
906 while (state != SHUTDOWN
907 || (cache_queue_head != NULL && config_flush_at_shutdown))
915 /* Now, check if there's something to store away. If not, wait until
916 * something comes in. */
917 if (cache_queue_head == NULL)
919 status = pthread_cond_wait (&queue_cond, &cache_lock);
920 if ((status != 0) && (status != ETIMEDOUT))
922 RRDD_LOG (LOG_ERR, "queue_thread_main: "
923 "pthread_cond_wait returned %i.", status);
927 /* Check if a value has arrived. This may be NULL if we timed out or there
928 * was an interrupt such as a signal. */
929 if (cache_queue_head == NULL)
932 ci = cache_queue_head;
934 /* copy the relevant parts */
935 file = strdup (ci->file);
938 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
942 assert(ci->values != NULL);
943 assert(ci->values_num > 0);
946 values_num = ci->values_num;
948 wipe_ci_values(ci, time(NULL));
949 remove_from_queue(ci);
951 pthread_mutex_unlock (&cache_lock);
954 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958 "rrd_update_r (%s) failed with status %i. (%s)",
959 file, status, rrd_get_error());
962 journal_write("wrote", file);
964 /* Search again in the tree. It's possible someone issued a "FORGET"
965 * while we were writing the update values. */
966 pthread_mutex_lock(&cache_lock);
967 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
969 pthread_cond_broadcast(&ci->flushed);
970 pthread_mutex_unlock(&cache_lock);
974 pthread_mutex_lock (&stats_lock);
975 stats_updates_written++;
976 stats_data_sets_written += values_num;
977 pthread_mutex_unlock (&stats_lock);
980 rrd_free_ptrs((void ***) &values, &values_num);
983 pthread_mutex_lock (&cache_lock);
985 pthread_mutex_unlock (&cache_lock);
988 } /* }}} void *queue_thread_main */
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991 size_t *buffer_size_ret, char **field_ret)
1000 buffer = *buffer_ret;
1002 buffer_size = *buffer_size_ret;
1003 field = *buffer_ret;
1006 if (buffer_size <= 0)
1009 /* This is ensured by `handle_request'. */
1010 assert (buffer[buffer_size - 1] == '\0');
1013 while (buffer_pos < buffer_size)
1015 /* Check for end-of-field or end-of-buffer */
1016 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1018 field[field_size] = 0;
1024 /* Handle escaped characters. */
1025 else if (buffer[buffer_pos] == '\\')
1027 if (buffer_pos >= (buffer_size - 1))
1030 field[field_size] = buffer[buffer_pos];
1034 /* Normal operation */
1037 field[field_size] = buffer[buffer_pos];
1041 } /* while (buffer_pos < buffer_size) */
1046 *buffer_ret = buffer + buffer_pos;
1047 *buffer_size_ret = buffer_size - buffer_pos;
1051 } /* }}} int buffer_get_field */
1053 /* if we're restricting writes to the base directory,
1054 * check whether the file falls within the dir
1055 * returns 1 if OK, otherwise 0
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1059 assert(file != NULL);
1061 if (!config_write_base_only
1062 || JOURNAL_REPLAY(sock)
1063 || config_base_dir == NULL)
1066 if (strstr(file, "../") != NULL) goto err;
1068 /* relative paths without "../" are ok */
1069 if (*file != '/') return 1;
1071 /* file must be of the format base + "/" + <1+ char filename> */
1072 if (strlen(file) < _config_base_dir_len + 2) goto err;
1073 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074 if (*(file + _config_base_dir_len) != '/') goto err;
1079 if (sock != NULL && sock->fd >= 0)
1080 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083 } /* }}} static int check_file_access */
1085 /* when using a base dir, convert relative paths to absolute paths.
1086 * if necessary, modifies the "filename" pointer to point
1087 * to the new path created in "tmp". "tmp" is provided
1088 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1090 * this allows us to optimize for the expected case (absolute path)
1093 static void get_abs_path(char **filename, char *tmp)
1095 assert(tmp != NULL);
1096 assert(filename != NULL && *filename != NULL);
1098 if (config_base_dir == NULL || **filename == '/')
1101 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1103 } /* }}} static int get_abs_path */
1105 static int flush_file (const char *filename) /* {{{ */
1109 pthread_mutex_lock (&cache_lock);
1111 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114 pthread_mutex_unlock (&cache_lock);
1118 if (ci->values_num > 0)
1120 /* Enqueue at head */
1121 enqueue_cache_item (ci, HEAD);
1122 pthread_cond_wait(&ci->flushed, &cache_lock);
1125 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1126 * may have been purged during our cond_wait() */
1128 pthread_mutex_unlock(&cache_lock);
1131 } /* }}} int flush_file */
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1135 char *err = "Syntax error.\n";
1137 if (cmd && cmd->syntax)
1140 return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1145 uint64_t copy_queue_length;
1146 uint64_t copy_updates_received;
1147 uint64_t copy_flush_received;
1148 uint64_t copy_updates_written;
1149 uint64_t copy_data_sets_written;
1150 uint64_t copy_journal_bytes;
1151 uint64_t copy_journal_rotate;
1153 uint64_t tree_nodes_number;
1154 uint64_t tree_depth;
1156 pthread_mutex_lock (&stats_lock);
1157 copy_queue_length = stats_queue_length;
1158 copy_updates_received = stats_updates_received;
1159 copy_flush_received = stats_flush_received;
1160 copy_updates_written = stats_updates_written;
1161 copy_data_sets_written = stats_data_sets_written;
1162 copy_journal_bytes = stats_journal_bytes;
1163 copy_journal_rotate = stats_journal_rotate;
1164 pthread_mutex_unlock (&stats_lock);
1166 pthread_mutex_lock (&cache_lock);
1167 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168 tree_depth = (uint64_t) g_tree_height (cache_tree);
1169 pthread_mutex_unlock (&cache_lock);
1171 add_response_info(sock,
1172 "QueueLength: %"PRIu64"\n", copy_queue_length);
1173 add_response_info(sock,
1174 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175 add_response_info(sock,
1176 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177 add_response_info(sock,
1178 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179 add_response_info(sock,
1180 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1186 send_response(sock, RESP_OK, "Statistics follow\n");
1189 } /* }}} int handle_request_stats */
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1193 char *file, file_tmp[PATH_MAX];
1196 status = buffer_get_field (&buffer, &buffer_size, &file);
1199 return syntax_error(sock,cmd);
1203 pthread_mutex_lock(&stats_lock);
1204 stats_flush_received++;
1205 pthread_mutex_unlock(&stats_lock);
1207 get_abs_path(&file, file_tmp);
1208 if (!check_file_access(file, sock)) return 0;
1210 status = flush_file (file);
1212 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213 else if (status == ENOENT)
1215 /* no file in our tree; see whether it exists at all */
1216 struct stat statbuf;
1218 memset(&statbuf, 0, sizeof(statbuf));
1219 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1222 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1224 else if (status < 0)
1225 return send_response(sock, RESP_ERR, "Internal error.\n");
1227 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232 } /* }}} int handle_request_flush */
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1236 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1238 pthread_mutex_lock(&cache_lock);
1239 flush_old_values(-1);
1240 pthread_mutex_unlock(&cache_lock);
1242 return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 char *file, file_tmp[PATH_MAX];
1251 status = buffer_get_field(&buffer, &buffer_size, &file);
1253 return syntax_error(sock,cmd);
1255 get_abs_path(&file, file_tmp);
1257 pthread_mutex_lock(&cache_lock);
1258 ci = g_tree_lookup(cache_tree, file);
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265 for (size_t i=0; i < ci->values_num; i++)
1266 add_response_info(sock, "%s\n", ci->values[i]);
1268 pthread_mutex_unlock(&cache_lock);
1269 return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1276 char *file, file_tmp[PATH_MAX];
1278 status = buffer_get_field(&buffer, &buffer_size, &file);
1280 return syntax_error(sock,cmd);
1282 get_abs_path(&file, file_tmp);
1283 if (!check_file_access(file, sock)) return 0;
1285 pthread_mutex_lock(&cache_lock);
1286 found = g_tree_remove(cache_tree, file);
1287 pthread_mutex_unlock(&cache_lock);
1291 if (!JOURNAL_REPLAY(sock))
1292 journal_write("forget", file);
1294 return send_response(sock, RESP_OK, "Gone!\n");
1297 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301 } /* }}} static int handle_request_forget */
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1307 pthread_mutex_lock(&cache_lock);
1309 ci = cache_queue_head;
1312 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1316 pthread_mutex_unlock(&cache_lock);
1318 return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1323 char *file, file_tmp[PATH_MAX];
1326 char orig_buf[CMD_MAX];
1330 /* save it for the journal later */
1331 if (!JOURNAL_REPLAY(sock))
1332 strncpy(orig_buf, buffer, buffer_size);
1334 status = buffer_get_field (&buffer, &buffer_size, &file);
1336 return syntax_error(sock,cmd);
1338 pthread_mutex_lock(&stats_lock);
1339 stats_updates_received++;
1340 pthread_mutex_unlock(&stats_lock);
1342 get_abs_path(&file, file_tmp);
1343 if (!check_file_access(file, sock)) return 0;
1345 pthread_mutex_lock (&cache_lock);
1346 ci = g_tree_lookup (cache_tree, file);
1348 if (ci == NULL) /* {{{ */
1350 struct stat statbuf;
1353 /* don't hold the lock while we setup; stat(2) might block */
1354 pthread_mutex_unlock(&cache_lock);
1356 memset (&statbuf, 0, sizeof (statbuf));
1357 status = stat (file, &statbuf);
1360 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1363 if (status == ENOENT)
1364 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1366 return send_response(sock, RESP_ERR,
1367 "stat failed with error %i.\n", status);
1369 if (!S_ISREG (statbuf.st_mode))
1370 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1372 if (access(file, R_OK|W_OK) != 0)
1373 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374 file, rrd_strerror(errno));
1376 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1381 return send_response(sock, RESP_ERR, "malloc failed.\n");
1383 memset (ci, 0, sizeof (cache_item_t));
1385 ci->file = strdup (file);
1386 if (ci->file == NULL)
1389 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1391 return send_response(sock, RESP_ERR, "strdup failed.\n");
1394 wipe_ci_values(ci, now);
1395 ci->flags = CI_FLAGS_IN_TREE;
1396 pthread_cond_init(&ci->flushed, NULL);
1398 pthread_mutex_lock(&cache_lock);
1400 /* another UPDATE might have added this entry in the meantime */
1401 tmp = g_tree_lookup (cache_tree, file);
1403 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406 free_cache_item (ci);
1410 /* state may have changed while we were unlocked */
1411 if (state == SHUTDOWN)
1414 assert (ci != NULL);
1416 /* don't re-write updates in replay mode */
1417 if (!JOURNAL_REPLAY(sock))
1418 journal_write("update", orig_buf);
1420 while (buffer_size > 0)
1426 status = buffer_get_field (&buffer, &buffer_size, &value);
1429 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1433 /* make sure update time is always moving forward */
1434 stamp = strtol(value, &eostamp, 10);
1435 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "Cannot find timestamp in '%s'!\n", value);
1441 else if (stamp <= ci->last_update_stamp)
1443 pthread_mutex_unlock(&cache_lock);
1444 return send_response(sock, RESP_ERR,
1445 "illegal attempt to update using time %ld when last"
1446 " update time is %ld (minimum one second step)\n",
1447 stamp, ci->last_update_stamp);
1450 ci->last_update_stamp = stamp;
1452 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453 &ci->values_alloc, config_alloc_chunk))
1455 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1462 if (((now - ci->last_flush_time) >= config_write_interval)
1463 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464 && (ci->values_num > 0))
1466 enqueue_cache_item (ci, TAIL);
1469 pthread_mutex_unlock (&cache_lock);
1472 return send_response(sock, RESP_ERR, "No values updated.\n");
1474 return send_response(sock, RESP_OK,
1475 "errors, enqueued %i value(s).\n", values_num);
1480 } /* }}} int handle_request_update */
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1484 char *file, file_tmp[PATH_MAX];
1493 unsigned long ds_cnt;
1500 rrd_value_t *data_ptr;
1507 /* Read the arguments */
1510 status = buffer_get_field (&buffer, &buffer_size, &file);
1514 status = buffer_get_field (&buffer, &buffer_size, &cf);
1518 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1526 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1536 return (syntax_error(sock,cmd));
1538 get_abs_path(&file, file_tmp);
1539 if (!check_file_access(file, sock)) return 0;
1541 status = flush_file (file);
1542 if ((status != 0) && (status != ENOENT))
1543 return (send_response (sock, RESP_ERR,
1544 "flush_file (%s) failed with status %i.\n", file, status));
1546 t = time (NULL); /* "now" */
1548 /* Parse start time */
1549 if (start_str != NULL)
1556 value = strtol (start_str, &endptr, /* base = */ 0);
1557 if ((endptr == start_str) || (errno != 0))
1558 return (send_response(sock, RESP_ERR,
1559 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1563 start_tm = (time_t) value;
1565 start_tm = (time_t) (t + value);
1569 start_tm = t - 86400;
1572 /* Parse end time */
1573 if (end_str != NULL)
1580 value = strtol (end_str, &endptr, /* base = */ 0);
1581 if ((endptr == end_str) || (errno != 0))
1582 return (send_response(sock, RESP_ERR,
1583 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1587 end_tm = (time_t) value;
1589 end_tm = (time_t) (t + value);
1601 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602 &ds_cnt, &ds_namv, &data);
1604 return (send_response(sock, RESP_ERR,
1605 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1607 add_response_info (sock, "FlushVersion: %lu\n", 1);
1608 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610 add_response_info (sock, "Step: %lu\n", step);
1611 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614 size_t str_len = strlen (str); \
1615 if ((buffer_fill + str_len) > sizeof (buffer)) \
1616 str_len = sizeof (buffer) - buffer_fill; \
1617 if (str_len > 0) { \
1618 strncpy (buffer + buffer_fill, str, str_len); \
1619 buffer_fill += str_len; \
1620 assert (buffer_fill <= sizeof (buffer)); \
1621 if (buffer_fill == sizeof (buffer)) \
1622 buffer[buffer_fill - 1] = 0; \
1624 buffer[buffer_fill] = 0; \
1628 { /* Add list of DS names */
1630 size_t linebuf_fill;
1632 memset (linebuf, 0, sizeof (linebuf));
1634 for (i = 0; i < ds_cnt; i++)
1637 SSTRCAT (linebuf, " ", linebuf_fill);
1638 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639 rrd_freemem(ds_namv[i]);
1641 rrd_freemem(ds_namv);
1642 add_response_info (sock, "DSName: %s\n", linebuf);
1645 /* Add the actual data */
1648 for (t = start_tm + step; t <= end_tm; t += step)
1651 size_t linebuf_fill;
1654 memset (linebuf, 0, sizeof (linebuf));
1656 for (i = 0; i < ds_cnt; i++)
1658 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659 tmp[sizeof (tmp) - 1] = 0;
1660 SSTRCAT (linebuf, tmp, linebuf_fill);
1665 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1669 return (send_response (sock, RESP_OK, "Success\n"));
1671 } /* }}} int handle_request_fetch */
1673 /* we came across a "WROTE" entry during journal replay.
1674 * throw away any values that we have accumulated for this file
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1679 const char *file = buffer;
1681 pthread_mutex_lock(&cache_lock);
1683 ci = g_tree_lookup(cache_tree, file);
1686 pthread_mutex_unlock(&cache_lock);
1691 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1693 wipe_ci_values(ci, now);
1694 remove_from_queue(ci);
1696 pthread_mutex_unlock(&cache_lock);
1698 } /* }}} int handle_request_wrote */
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1702 char *file, file_tmp[PATH_MAX];
1706 /* obtain filename */
1707 status = buffer_get_field(&buffer, &buffer_size, &file);
1709 return syntax_error(sock,cmd);
1710 /* get full pathname */
1711 get_abs_path(&file, file_tmp);
1712 if (!check_file_access(file, sock)) {
1713 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1717 data = rrd_info_r(file);
1719 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1722 switch (data->type) {
1724 if (isnan(data->value.u_val))
1725 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1727 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1730 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1733 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1736 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1739 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info */
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1749 char *i, *file, file_tmp[PATH_MAX];
1754 /* obtain filename */
1755 status = buffer_get_field(&buffer, &buffer_size, &file);
1757 return syntax_error(sock,cmd);
1758 /* get full pathname */
1759 get_abs_path(&file, file_tmp);
1760 if (!check_file_access(file, sock)) {
1761 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1764 status = buffer_get_field(&buffer, &buffer_size, &i);
1766 return syntax_error(sock,cmd);
1769 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1774 t = rrd_first_r(file,idx);
1776 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1778 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first */
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1784 char *file, file_tmp[PATH_MAX];
1786 time_t t, from_file, step;
1787 rrd_file_t * rrd_file;
1791 /* obtain filename */
1792 status = buffer_get_field(&buffer, &buffer_size, &file);
1794 return syntax_error(sock,cmd);
1795 /* get full pathname */
1796 get_abs_path(&file, file_tmp);
1797 if (!check_file_access(file, sock)) {
1798 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1802 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1804 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1806 from_file = rrd.live_head->last_up;
1807 step = rrd.stat_head->pdp_step;
1808 rrd_close(rrd_file);
1809 pthread_mutex_lock(&cache_lock);
1810 ci = g_tree_lookup(cache_tree, file);
1812 t = ci->last_update_stamp;
1815 pthread_mutex_unlock(&cache_lock);
1819 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1821 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last */
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1826 char *file, file_tmp[PATH_MAX];
1831 unsigned long step = 300;
1832 time_t last_up = time(NULL)-10;
1833 int no_overwrite = opt_no_overwrite;
1836 /* obtain filename */
1837 status = buffer_get_field(&buffer, &buffer_size, &file);
1839 return syntax_error(sock,cmd);
1840 /* get full pathname */
1841 get_abs_path(&file, file_tmp);
1842 if (!check_file_access(file, sock)) {
1843 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1845 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1847 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848 if( ! strncmp(tok,"-b",2) ) {
1849 status = buffer_get_field(&buffer, &buffer_size, &tok );
1850 if (status != 0) return syntax_error(sock,cmd);
1851 last_up = (time_t) atol(tok);
1854 if( ! strncmp(tok,"-s",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1860 if( ! strncmp(tok,"-O",2) ) {
1864 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866 return syntax_error(sock,cmd);
1869 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1871 if (last_up < 3600 * 24 * 365 * 10) {
1872 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1876 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1879 return send_response(sock, RESP_OK, "RRD created OK\n");
1881 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create */
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1888 if (sock->batch_start)
1889 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1891 status = send_response(sock, RESP_OK,
1892 "Go ahead. End with dot '.' on its own line.\n");
1893 sock->batch_start = time(NULL);
1894 sock->batch_cmd = 0;
1897 } /* }}} static int batch_start */
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1902 assert(sock->batch_start);
1903 sock->batch_start = 0;
1904 sock->batch_cmd = 0;
1905 return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1911 } /* }}} static int handle_request_quit */
1913 static command_t list_of_commands[] = { /* {{{ */
1916 handle_request_update,
1918 "UPDATE <filename> <values> [<values> ...]\n"
1920 "Adds the given file to the internal cache if it is not yet known and\n"
1921 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1924 "Each <values> has the following form:\n"
1925 " <values> = <time>:<value>[:<value>[...]]\n"
1926 "See the rrdupdate(1) manpage for details.\n"
1930 handle_request_wrote,
1931 CMD_CONTEXT_JOURNAL,
1937 handle_request_flush,
1938 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939 "FLUSH <filename>\n"
1941 "Adds the given filename to the head of the update queue and returns\n"
1942 "after it has been dequeued.\n"
1946 handle_request_flushall,
1950 "Triggers writing of all pending updates. Returns immediately.\n"
1954 handle_request_pending,
1956 "PENDING <filename>\n"
1958 "Shows any 'pending' updates for a file, in order.\n"
1959 "The updates shown have not yet been written to the underlying RRD file.\n"
1963 handle_request_forget,
1965 "FORGET <filename>\n"
1967 "Removes the file completely from the cache.\n"
1968 "Any pending updates for the file will be lost.\n"
1972 handle_request_queue,
1976 "Shows all files in the output queue.\n"
1977 "The output is zero or more lines in the following format:\n"
1978 "(where <num_vals> is the number of values to be written)\n"
1980 "<num_vals> <filename>\n"
1984 handle_request_stats,
1988 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989 "a description of the values.\n"
1993 handle_request_help,
1995 "HELP [<command>]\n",
1996 NULL, /* special! */
2004 "The 'BATCH' command permits the client to initiate a bulk load\n"
2005 " of commands to rrdcached.\n"
2010 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2011 " client: command #1\n"
2012 " client: command #2\n"
2013 " client: ... and so on\n"
2015 " server: 2 errors\n"
2016 " server: 7 message for command #7\n"
2017 " server: 9 message for command #9\n"
2019 "For more information, consult the rrdcached(1) documentation.\n"
2022 ".", /* BATCH terminator */
2030 handle_request_fetch,
2032 "FETCH <file> <CF> [<start> [<end>]]\n"
2034 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2038 handle_request_info,
2040 "INFO <filename>\n",
2041 "The INFO command retrieves information about a specified RRD file.\n"
2042 "This is returned in standard rrdinfo format, a sequence of lines\n"
2043 "with the format <keyname> = <value>\n"
2044 "Note that this is the data as of the last update of the RRD file itself,\n"
2045 "not the last time data was received via rrdcached, so there may be pending\n"
2046 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2050 handle_request_first,
2052 "FIRST <filename> <rra index>\n",
2053 "The FIRST command retrieves the first data time for a specified RRA in\n"
2058 handle_request_last,
2060 "LAST <filename>\n",
2061 "The LAST command retrieves the last update time for a specified RRD file.\n"
2062 "Note that this is the time of the last update of the RRD file itself, not\n"
2063 "the last time data was received via rrdcached, so there may be pending\n"
2064 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2068 handle_request_create,
2069 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071 "The CREATE command will create an RRD file, overwriting any existing file\n"
2072 "unless the -O option is given or rrdcached was started with the -O option.\n"
2073 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074 "not acceptable) and the step is in seconds (default is 300).\n"
2075 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2079 handle_request_quit,
2080 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2083 "Disconnect from rrdcached.\n"
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087 / sizeof (list_of_commands[0]);
2089 static command_t *find_command(char *cmd)
2093 for (i = 0; i < list_of_commands_len; i++)
2094 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095 return (&list_of_commands[i]);
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101 * outside these functions so that switching to a more elegant storage method
2102 * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2107 for (i = 0; i < list_of_commands_len; i++)
2108 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109 return ((ssize_t) i);
2111 } /* }}} ssize_t find_command_index */
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2118 if (JOURNAL_REPLAY(sock))
2124 if ((strcasecmp ("QUIT", cmd) == 0)
2125 || (strcasecmp ("HELP", cmd) == 0))
2127 else if (strcmp (".", cmd) == 0)
2130 i = find_command_index (cmd);
2135 if ((sock->permissions & (1 << i)) != 0)
2138 } /* }}} int socket_permission_check */
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2145 i = find_command_index (cmd);
2150 sock->permissions |= (1 << i);
2152 } /* }}} int socket_permission_add */
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2156 sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160 listen_socket_t *src)
2162 dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2165 /* check whether commands are received in the expected context */
2166 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2168 if (JOURNAL_REPLAY(sock))
2169 return (cmd->context & CMD_CONTEXT_JOURNAL);
2170 else if (sock->batch_start)
2171 return (cmd->context & CMD_CONTEXT_BATCH);
2173 return (cmd->context & CMD_CONTEXT_CLIENT);
2179 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2184 command_t *help = NULL;
2186 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2188 help = find_command(cmd_str);
2190 if (help && (help->syntax || help->help))
2194 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2198 add_response_info(sock, "Usage: %s\n", help->syntax);
2201 add_response_info(sock, "%s\n", help->help);
2207 resp_txt = "Command overview\n";
2209 for (i = 0; i < list_of_commands_len; i++)
2211 if (list_of_commands[i].syntax == NULL)
2213 add_response_info (sock, "%s", list_of_commands[i].syntax);
2217 return send_response(sock, RESP_OK, resp_txt);
2218 } /* }}} int handle_request_help */
2220 static int handle_request (DISPATCH_PROTO) /* {{{ */
2222 char *buffer_ptr = buffer;
2223 char *cmd_str = NULL;
2224 command_t *cmd = NULL;
2227 assert (buffer[buffer_size - 1] == '\0');
2229 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2232 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2236 if (sock != NULL && sock->batch_start)
2239 cmd = find_command(cmd_str);
2241 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2243 if (!socket_permission_check (sock, cmd->cmd))
2244 return send_response(sock, RESP_ERR, "Permission denied.\n");
2246 if (!command_check_context(sock, cmd))
2247 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2249 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2250 } /* }}} int handle_request */
2252 static void journal_set_free (journal_set *js) /* {{{ */
2257 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2260 } /* }}} journal_set_free */
2262 static void journal_set_remove (journal_set *js) /* {{{ */
2267 for (uint i=0; i < js->files_num; i++)
2269 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2270 unlink(js->files[i]);
2272 } /* }}} journal_set_remove */
2274 /* close current journal file handle.
2275 * MUST hold journal_lock before calling */
2276 static void journal_close(void) /* {{{ */
2278 if (journal_fh != NULL)
2280 if (fclose(journal_fh) != 0)
2281 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2286 } /* }}} journal_close */
2288 /* MUST hold journal_lock before calling */
2289 static void journal_new_file(void) /* {{{ */
2293 char new_file[PATH_MAX + 1];
2295 assert(journal_dir != NULL);
2296 assert(journal_cur != NULL);
2300 gettimeofday(&now, NULL);
2301 /* this format assures that the files sort in strcmp() order */
2302 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2303 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2305 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2306 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2310 journal_fh = fdopen(new_fd, "a");
2311 if (journal_fh == NULL)
2314 journal_size = ftell(journal_fh);
2315 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2317 /* record the file in the journal set */
2318 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2324 "JOURNALING DISABLED: Error while trying to create %s : %s",
2325 new_file, rrd_strerror(errno));
2327 "JOURNALING DISABLED: All values will be flushed at shutdown");
2330 config_flush_at_shutdown = 1;
2332 } /* }}} journal_new_file */
2334 /* MUST NOT hold journal_lock before calling this */
2335 static void journal_rotate(void) /* {{{ */
2337 journal_set *old_js = NULL;
2339 if (journal_dir == NULL)
2342 RRDD_LOG(LOG_DEBUG, "rotating journals");
2344 pthread_mutex_lock(&stats_lock);
2345 ++stats_journal_rotate;
2346 pthread_mutex_unlock(&stats_lock);
2348 pthread_mutex_lock(&journal_lock);
2352 /* rotate the journal sets */
2353 old_js = journal_old;
2354 journal_old = journal_cur;
2355 journal_cur = calloc(1, sizeof(journal_set));
2357 if (journal_cur != NULL)
2360 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2362 pthread_mutex_unlock(&journal_lock);
2364 journal_set_remove(old_js);
2365 journal_set_free (old_js);
2367 } /* }}} static void journal_rotate */
2369 /* MUST hold journal_lock when calling */
2370 static void journal_done(void) /* {{{ */
2372 if (journal_cur == NULL)
2377 if (config_flush_at_shutdown)
2379 RRDD_LOG(LOG_INFO, "removing journals");
2380 journal_set_remove(journal_old);
2381 journal_set_remove(journal_cur);
2385 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2386 "journals will be used at next startup");
2389 journal_set_free(journal_cur);
2390 journal_set_free(journal_old);
2393 } /* }}} static void journal_done */
2395 static int journal_write(char *cmd, char *args) /* {{{ */
2399 if (journal_fh == NULL)
2402 pthread_mutex_lock(&journal_lock);
2403 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2404 journal_size += chars;
2406 if (journal_size > JOURNAL_MAX)
2409 pthread_mutex_unlock(&journal_lock);
2413 pthread_mutex_lock(&stats_lock);
2414 stats_journal_bytes += chars;
2415 pthread_mutex_unlock(&stats_lock);
2419 } /* }}} static int journal_write */
2421 static int journal_replay (const char *file) /* {{{ */
2427 char entry[CMD_MAX];
2430 if (file == NULL) return 0;
2433 char *reason = "unknown error";
2435 struct stat statbuf;
2437 memset(&statbuf, 0, sizeof(statbuf));
2438 if (stat(file, &statbuf) != 0)
2440 reason = "stat error";
2443 else if (!S_ISREG(statbuf.st_mode))
2445 reason = "not a regular file";
2448 if (statbuf.st_uid != daemon_uid)
2450 reason = "not owned by daemon user";
2453 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2455 reason = "must not be user/group writable";
2461 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2462 file, rrd_strerror(status), reason);
2467 fh = fopen(file, "r");
2470 if (errno != ENOENT)
2471 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2472 file, rrd_strerror(errno));
2476 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2485 if (fgets(entry, sizeof(entry), fh) == NULL)
2487 entry_len = strlen(entry);
2489 /* check \n termination in case journal writing crashed mid-line */
2492 else if (entry[entry_len - 1] != '\n')
2494 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2499 entry[entry_len - 1] = '\0';
2501 if (handle_request(NULL, now, entry, entry_len) == 0)
2509 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2510 entry_cnt, fail_cnt);
2512 return entry_cnt > 0 ? 1 : 0;
2513 } /* }}} static int journal_replay */
2515 static int journal_sort(const void *v1, const void *v2)
2517 char **jn1 = (char **) v1;
2518 char **jn2 = (char **) v2;
2520 return strcmp(*jn1,*jn2);
2523 static void journal_init(void) /* {{{ */
2525 int had_journal = 0;
2527 struct dirent *dent;
2528 char path[PATH_MAX+1];
2530 if (journal_dir == NULL) return;
2532 pthread_mutex_lock(&journal_lock);
2534 journal_cur = calloc(1, sizeof(journal_set));
2535 if (journal_cur == NULL)
2537 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2541 RRDD_LOG(LOG_INFO, "checking for journal files");
2543 /* Handle old journal files during transition. This gives them the
2544 * correct sort order. TODO: remove after first release
2547 char old_path[PATH_MAX+1];
2548 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2549 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2550 rename(old_path, path);
2552 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2553 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2554 rename(old_path, path);
2557 dir = opendir(journal_dir);
2559 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2562 while ((dent = readdir(dir)) != NULL)
2564 /* looks like a journal file? */
2565 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2568 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2570 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2572 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2579 qsort(journal_cur->files, journal_cur->files_num,
2580 sizeof(journal_cur->files[0]), journal_sort);
2582 for (uint i=0; i < journal_cur->files_num; i++)
2583 had_journal += journal_replay(journal_cur->files[i]);
2587 /* it must have been a crash. start a flush */
2588 if (had_journal && config_flush_at_shutdown)
2589 flush_old_values(-1);
2591 pthread_mutex_unlock(&journal_lock);
2593 RRDD_LOG(LOG_INFO, "journal processing complete");
2595 } /* }}} static void journal_init */
2597 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2599 assert(sock != NULL);
2601 free(sock->rbuf); sock->rbuf = NULL;
2602 free(sock->wbuf); sock->wbuf = NULL;
2604 } /* }}} void free_listen_socket */
2606 static void close_connection(listen_socket_t *sock) /* {{{ */
2614 free_listen_socket(sock);
2616 } /* }}} void close_connection */
2618 static void *connection_thread_main (void *args) /* {{{ */
2620 listen_socket_t *sock;
2623 sock = (listen_socket_t *) args;
2626 /* init read buffers */
2627 sock->next_read = sock->next_cmd = 0;
2628 sock->rbuf = malloc(RBUF_SIZE);
2629 if (sock->rbuf == NULL)
2631 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2632 close_connection(sock);
2636 pthread_mutex_lock (&connection_threads_lock);
2637 connection_threads_num++;
2638 pthread_mutex_unlock (&connection_threads_lock);
2640 while (state == RUNNING)
2647 struct pollfd pollfd;
2651 pollfd.events = POLLIN | POLLPRI;
2654 status = poll (&pollfd, 1, /* timeout = */ 500);
2655 if (state != RUNNING)
2657 else if (status == 0) /* timeout */
2659 else if (status < 0) /* error */
2662 if (status != EINTR)
2663 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2667 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2669 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2671 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2672 "poll(2) returned something unexpected: %#04hx",
2677 rbytes = read(fd, sock->rbuf + sock->next_read,
2678 RBUF_SIZE - sock->next_read);
2681 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2684 else if (rbytes == 0)
2687 sock->next_read += rbytes;
2689 if (sock->batch_start)
2690 now = sock->batch_start;
2694 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2696 status = handle_request (sock, now, cmd, cmd_len+1);
2703 close_connection(sock);
2705 /* Remove this thread from the connection threads list */
2706 pthread_mutex_lock (&connection_threads_lock);
2707 connection_threads_num--;
2708 if (connection_threads_num <= 0)
2709 pthread_cond_broadcast(&connection_threads_done);
2710 pthread_mutex_unlock (&connection_threads_lock);
2713 } /* }}} void *connection_thread_main */
2715 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2718 struct sockaddr_un sa;
2719 listen_socket_t *temp;
2722 char *path_copy, *dir;
2725 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2726 path += strlen("unix:");
2728 /* dirname may modify its argument */
2729 path_copy = strdup(path);
2730 if (path_copy == NULL)
2732 fprintf(stderr, "rrdcached: strdup(): %s\n",
2733 rrd_strerror(errno));
2737 dir = dirname(path_copy);
2738 if (rrd_mkdir_p(dir, 0777) != 0)
2740 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2741 dir, rrd_strerror(errno));
2747 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2748 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2751 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2755 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2757 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2760 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2761 rrd_strerror(errno));
2765 memset (&sa, 0, sizeof (sa));
2766 sa.sun_family = AF_UNIX;
2767 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2769 /* if we've gotten this far, we own the pid file. any daemon started
2770 * with the same args must not be alive. therefore, ensure that we can
2771 * create the socket...
2775 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2778 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2779 path, rrd_strerror(errno));
2784 /* tweak the sockets group ownership */
2785 if (sock->socket_group != (gid_t)-1)
2787 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2788 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2790 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2794 if (sock->socket_permissions != (mode_t)-1)
2796 if (chmod(path, sock->socket_permissions) != 0)
2797 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2798 (unsigned int)sock->socket_permissions, strerror(errno));
2801 status = listen (fd, /* backlog = */ 10);
2804 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2805 path, rrd_strerror(errno));
2811 listen_fds[listen_fds_num].fd = fd;
2812 listen_fds[listen_fds_num].family = PF_UNIX;
2813 strncpy(listen_fds[listen_fds_num].addr, path,
2814 sizeof (listen_fds[listen_fds_num].addr) - 1);
2818 } /* }}} int open_listen_socket_unix */
2820 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2822 struct addrinfo ai_hints;
2823 struct addrinfo *ai_res;
2824 struct addrinfo *ai_ptr;
2825 char addr_copy[NI_MAXHOST];
2830 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2831 addr_copy[sizeof (addr_copy) - 1] = 0;
2834 memset (&ai_hints, 0, sizeof (ai_hints));
2835 ai_hints.ai_flags = 0;
2836 #ifdef AI_ADDRCONFIG
2837 ai_hints.ai_flags |= AI_ADDRCONFIG;
2839 ai_hints.ai_family = AF_UNSPEC;
2840 ai_hints.ai_socktype = SOCK_STREAM;
2843 if (*addr == '[') /* IPv6+port format */
2845 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2848 port = strchr (addr, ']');
2851 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2859 else if (*port == 0)
2863 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2866 } /* if (*addr == '[') */
2869 port = rindex(addr, ':');
2877 status = getaddrinfo (addr,
2878 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2879 &ai_hints, &ai_res);
2882 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2883 addr, gai_strerror (status));
2887 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2890 listen_socket_t *temp;
2893 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2894 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2898 "rrdcached: open_listen_socket_network: realloc failed.\n");
2902 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2904 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2907 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2908 rrd_strerror(errno));
2912 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2914 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2917 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2918 sock->addr, rrd_strerror(errno));
2923 status = listen (fd, /* backlog = */ 10);
2926 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2927 sock->addr, rrd_strerror(errno));
2929 freeaddrinfo(ai_res);
2933 listen_fds[listen_fds_num].fd = fd;
2934 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2936 } /* for (ai_ptr) */
2938 freeaddrinfo(ai_res);
2940 } /* }}} static int open_listen_socket_network */
2942 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2944 assert(sock != NULL);
2945 assert(sock->addr != NULL);
2947 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2948 || sock->addr[0] == '/')
2949 return (open_listen_socket_unix(sock));
2951 return (open_listen_socket_network(sock));
2952 } /* }}} int open_listen_socket */
2954 static int close_listen_sockets (void) /* {{{ */
2958 for (i = 0; i < listen_fds_num; i++)
2960 close (listen_fds[i].fd);
2962 if (listen_fds[i].family == PF_UNIX)
2963 unlink(listen_fds[i].addr);
2971 } /* }}} int close_listen_sockets */
2973 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2975 struct pollfd *pollfds;
2980 if (listen_fds_num < 1)
2982 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2986 pollfds_num = listen_fds_num;
2987 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2988 if (pollfds == NULL)
2990 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2993 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2995 RRDD_LOG(LOG_INFO, "listening for connections");
2997 while (state == RUNNING)
2999 for (i = 0; i < pollfds_num; i++)
3001 pollfds[i].fd = listen_fds[i].fd;
3002 pollfds[i].events = POLLIN | POLLPRI;
3003 pollfds[i].revents = 0;
3006 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3007 if (state != RUNNING)
3009 else if (status == 0) /* timeout */
3011 else if (status < 0) /* error */
3014 if (status != EINTR)
3016 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3021 for (i = 0; i < pollfds_num; i++)
3023 listen_socket_t *client_sock;
3024 struct sockaddr_storage client_sa;
3025 socklen_t client_sa_size;
3027 pthread_attr_t attr;
3029 if (pollfds[i].revents == 0)
3032 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3034 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3035 "poll(2) returned something unexpected for listen FD #%i.",
3040 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3041 if (client_sock == NULL)
3043 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3046 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3048 client_sa_size = sizeof (client_sa);
3049 client_sock->fd = accept (pollfds[i].fd,
3050 (struct sockaddr *) &client_sa, &client_sa_size);
3051 if (client_sock->fd < 0)
3053 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3058 pthread_attr_init (&attr);
3059 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3061 status = pthread_create (&tid, &attr, connection_thread_main,
3065 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3066 close_connection(client_sock);
3069 } /* for (pollfds_num) */
3070 } /* while (state == RUNNING) */
3072 RRDD_LOG(LOG_INFO, "starting shutdown");
3074 close_listen_sockets ();
3076 pthread_mutex_lock (&connection_threads_lock);
3077 while (connection_threads_num > 0)
3078 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3079 pthread_mutex_unlock (&connection_threads_lock);
3084 } /* }}} void *listen_thread_main */
3086 static int daemonize (void) /* {{{ */
3091 daemon_uid = geteuid();
3093 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3095 pid_fd = check_pidfile();
3099 /* open all the listen sockets */
3100 if (config_listen_address_list_len > 0)
3102 for (size_t i = 0; i < config_listen_address_list_len; i++)
3103 open_listen_socket (config_listen_address_list[i]);
3105 rrd_free_ptrs((void ***) &config_listen_address_list,
3106 &config_listen_address_list_len);
3110 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3111 sizeof(default_socket.addr) - 1);
3112 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3113 open_listen_socket (&default_socket);
3116 if (listen_fds_num < 1)
3118 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3122 if (!stay_foreground)
3129 fprintf (stderr, "daemonize: fork(2) failed.\n");
3135 /* Become session leader */
3138 /* Open the first three file descriptors to /dev/null */
3143 open ("/dev/null", O_RDWR);
3144 if (dup(0) == -1 || dup(0) == -1){
3145 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3147 } /* if (!stay_foreground) */
3149 /* Change into the /tmp directory. */
3150 base_dir = (config_base_dir != NULL)
3154 if (chdir (base_dir) != 0)
3156 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3160 install_signal_handlers();
3162 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3163 RRDD_LOG(LOG_INFO, "starting up");
3165 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3166 (GDestroyNotify) free_cache_item);
3167 if (cache_tree == NULL)
3169 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3173 return write_pidfile (pid_fd);
3178 } /* }}} int daemonize */
3180 static int cleanup (void) /* {{{ */
3182 pthread_cond_broadcast (&flush_cond);
3183 pthread_join (flush_thread, NULL);
3185 pthread_cond_broadcast (&queue_cond);
3186 for (int i = 0; i < config_queue_threads; i++)
3187 pthread_join (queue_threads[i], NULL);
3189 if (config_flush_at_shutdown)
3191 assert(cache_queue_head == NULL);
3192 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3195 free(queue_threads);
3196 free(config_base_dir);
3198 pthread_mutex_lock(&cache_lock);
3199 g_tree_destroy(cache_tree);
3201 pthread_mutex_lock(&journal_lock);
3204 RRDD_LOG(LOG_INFO, "goodbye");
3208 free(config_pid_file);
3211 } /* }}} int cleanup */
3213 static int read_options (int argc, char **argv) /* {{{ */
3218 socket_permission_clear (&default_socket);
3220 default_socket.socket_group = (gid_t)-1;
3221 default_socket.socket_permissions = (mode_t)-1;
3223 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3228 opt_no_overwrite = 1;
3237 listen_socket_t *new;
3239 new = malloc(sizeof(listen_socket_t));
3242 fprintf(stderr, "read_options: malloc failed.\n");
3245 memset(new, 0, sizeof(listen_socket_t));
3247 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3249 /* Add permissions to the socket {{{ */
3250 if (default_socket.permissions != 0)
3252 socket_permission_copy (new, &default_socket);
3254 else /* if (default_socket.permissions == 0) */
3256 /* Add permission for ALL commands to the socket. */
3258 for (i = 0; i < list_of_commands_len; i++)
3260 status = socket_permission_add (new, list_of_commands[i].cmd);
3263 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3264 "socket failed. This should never happen, ever! Sorry.\n",
3265 list_of_commands[i].cmd);
3270 /* }}} Done adding permissions. */
3272 new->socket_group = default_socket.socket_group;
3273 new->socket_permissions = default_socket.socket_permissions;
3275 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3276 &config_listen_address_list_len, new))
3278 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3284 /* set socket group permissions */
3290 group_gid = strtoul(optarg, NULL, 10);
3291 if (errno != EINVAL && group_gid>0)
3293 /* we were passed a number */
3294 grp = getgrgid(group_gid);
3298 grp = getgrnam(optarg);
3303 default_socket.socket_group = grp->gr_gid;
3307 /* no idea what the user wanted... */
3308 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3314 /* set socket file permissions */
3318 char *endptr = NULL;
3320 tmp = strtol (optarg, &endptr, 8);
3321 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3322 || (tmp > 07777) || (tmp < 0)) {
3323 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3328 default_socket.socket_permissions = (mode_t)tmp;
3339 socket_permission_clear (&default_socket);
3341 optcopy = strdup (optarg);
3344 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3347 status = socket_permission_add (&default_socket, ptr);
3350 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3351 "socket failed. Most likely, this permission doesn't "
3352 "exist. Check your command line.\n", ptr);
3365 temp = atoi (optarg);
3367 config_flush_interval = temp;
3370 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3380 temp = atoi (optarg);
3382 config_write_interval = temp;
3385 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3395 temp = atoi(optarg);
3397 config_write_jitter = temp;
3400 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3410 threads = atoi(optarg);
3412 config_queue_threads = threads;
3415 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3422 config_write_base_only = 1;
3428 char base_realpath[PATH_MAX];
3430 if (config_base_dir != NULL)
3431 free (config_base_dir);
3432 config_base_dir = strdup (optarg);
3433 if (config_base_dir == NULL)
3435 fprintf (stderr, "read_options: strdup failed.\n");
3439 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3441 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3442 config_base_dir, rrd_strerror (errno));
3446 /* make sure that the base directory is not resolved via
3447 * symbolic links. this makes some performance-enhancing
3448 * assumptions possible (we don't have to resolve paths
3449 * that start with a "/")
3451 if (realpath(config_base_dir, base_realpath) == NULL)
3453 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3454 "%s\n", config_base_dir, rrd_strerror(errno));
3458 len = strlen (config_base_dir);
3459 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3461 config_base_dir[len - 1] = 0;
3467 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3471 _config_base_dir_len = len;
3473 len = strlen (base_realpath);
3474 while ((len > 0) && (base_realpath[len - 1] == '/'))
3476 base_realpath[len - 1] = '\0';
3480 if (strncmp(config_base_dir,
3481 base_realpath, sizeof(base_realpath)) != 0)
3484 "Base directory (-b) resolved via file system links!\n"
3485 "Please consult rrdcached '-b' documentation!\n"
3486 "Consider specifying the real directory (%s)\n",
3495 if (config_pid_file != NULL)
3496 free (config_pid_file);
3497 config_pid_file = strdup (optarg);
3498 if (config_pid_file == NULL)
3500 fprintf (stderr, "read_options: strdup failed.\n");
3507 config_flush_at_shutdown = 1;
3512 char journal_dir_actual[PATH_MAX];
3514 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3516 status = rrd_mkdir_p(dir, 0777);
3519 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3520 dir, rrd_strerror(errno));
3524 if (access(dir, R_OK|W_OK|X_OK) != 0)
3526 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3527 errno ? rrd_strerror(errno) : "");
3535 int temp = atoi(optarg);
3537 config_alloc_chunk = temp;
3540 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3548 printf ("RRDCacheD %s\n"
3549 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3551 "Usage: rrdcached [options]\n"
3553 "Valid options are:\n"
3554 " -l <address> Socket address to listen to.\n"
3555 " -P <perms> Sets the permissions to assign to all following "
3557 " -w <seconds> Interval in which to write data.\n"
3558 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3559 " -t <threads> Number of write threads.\n"
3560 " -f <seconds> Interval in which to flush dead data.\n"
3561 " -p <file> Location of the PID-file.\n"
3562 " -b <dir> Base directory to change to.\n"
3563 " -B Restrict file access to paths within -b <dir>\n"
3564 " -g Do not fork and run in the foreground.\n"
3565 " -j <dir> Directory in which to create the journal files.\n"
3566 " -F Always flush all updates at shutdown\n"
3567 " -s <id|name> Group owner of all following UNIX sockets\n"
3568 " (the socket will also have read/write permissions "
3570 " -m <mode> File permissions (octal) of all following UNIX "
3572 " -a <size> Memory allocation chunk size. Default is 1.\n"
3573 " -O Do not allow CREATE commands to overwrite existing\n"
3574 " files, even if asked to.\n"
3576 "For more information and a detailed description of all options "
3578 "to the rrdcached(1) manual page.\n",
3585 } /* switch (option) */
3586 } /* while (getopt) */
3588 /* advise the user when values are not sane */
3589 if (config_flush_interval < 2 * config_write_interval)
3590 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3591 " 2x write interval (-w) !\n");
3592 if (config_write_jitter > config_write_interval)
3593 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3594 " write interval (-w) !\n");
3596 if (config_write_base_only && config_base_dir == NULL)
3597 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3598 " Consult the rrdcached documentation\n");
3600 if (journal_dir == NULL)
3601 config_flush_at_shutdown = 1;
3604 } /* }}} int read_options */
3606 int main (int argc, char **argv)
3610 status = read_options (argc, argv);
3618 status = daemonize ();
3621 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3627 /* start the queue threads */
3628 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3629 if (queue_threads == NULL)
3631 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3635 for (int i = 0; i < config_queue_threads; i++)
3637 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3638 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3641 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3647 /* start the flush thread */
3648 memset(&flush_thread, 0, sizeof(flush_thread));
3649 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3652 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3657 listen_thread_main (NULL);
3664 * vim: set sw=2 sts=2 ts=8 et fdm=marker :