2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
70 #include "../rrd_config.h"
75 #include "rrd_client.h"
86 #include <sys/types.h>
90 #include <sys/socket.h>
101 #include <glib-2.0/glib.h>
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
107 # define __attribute__(x) /**/
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
124 char addr[PATH_MAX + 1];
126 socket_privilege privilege;
128 /* state for BATCH processing */
140 typedef struct listen_socket_s listen_socket_t;
143 typedef struct cache_item_s cache_item_t;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
154 pthread_cond_t flushed;
159 struct callback_flush_data_s
166 typedef struct callback_flush_data_s callback_flush_data_t;
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
177 #define RBUF_SIZE (CMD_MAX*2)
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
236 static void sig_common (const char *sig) /* {{{ */
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
255 config_flush_at_shutdown = 1;
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
261 config_flush_at_shutdown = 0;
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
267 /* These structures are static, because `sigaction' behaves weird if the are
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
303 file = (config_pid_file != NULL)
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
322 pid_fd = open_pidfile("open", O_RDWR);
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
360 fh = fdopen (fd, "w");
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
368 fprintf (fh, "%i\n", (int) pid);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
379 file = (config_pid_file != NULL)
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
408 char *cmd = sock->rbuf + sock->next_cmd;
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
430 assert(sock != NULL);
432 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
451 char buffer[CMD_MAX];
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
461 len = vsprintf(buffer, fmt, argp);
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
479 while ((str = strchr(str, '\n')) != NULL)
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
496 char buffer[CMD_MAX];
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
514 rclen = sprintf(buffer, "%d ", lines);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
519 len = vsprintf(buffer+rclen, fmt, argp);
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
538 if (sock->wbuf != NULL && rc == RESP_OK)
541 while (wrote < sock->wbuf_len)
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
553 free(sock->wbuf); sock->wbuf = NULL;
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* remove an entry from the tree and free all its resources.
593 * must hold 'cache lock' while calling this.
594 * returns 0 on success, otherwise errno */
595 static int forget_file(const char *file)
599 ci = g_tree_lookup(cache_tree, file);
603 g_tree_remove (cache_tree, file);
604 remove_from_queue(ci);
606 for (int i=0; i < ci->values_num; i++)
612 /* in case anyone is waiting */
613 pthread_cond_broadcast(&ci->flushed);
618 } /* }}} static int forget_file */
621 * enqueue_cache_item:
622 * `cache_lock' must be acquired before calling this function!
624 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
630 if (ci->values_num == 0)
635 if (cache_queue_head == ci)
638 /* remove if further down in queue */
639 remove_from_queue(ci);
642 ci->next = cache_queue_head;
643 if (ci->next != NULL)
645 cache_queue_head = ci;
647 if (cache_queue_tail == NULL)
648 cache_queue_tail = cache_queue_head;
650 else /* (side == TAIL) */
652 /* We don't move values back in the list.. */
653 if (ci->flags & CI_FLAGS_IN_QUEUE)
656 assert (ci->next == NULL);
657 assert (ci->prev == NULL);
659 ci->prev = cache_queue_tail;
661 if (cache_queue_tail == NULL)
662 cache_queue_head = ci;
664 cache_queue_tail->next = ci;
666 cache_queue_tail = ci;
669 ci->flags |= CI_FLAGS_IN_QUEUE;
671 pthread_cond_broadcast(&cache_cond);
672 pthread_mutex_lock (&stats_lock);
673 stats_queue_length++;
674 pthread_mutex_unlock (&stats_lock);
677 } /* }}} int enqueue_cache_item */
680 * tree_callback_flush:
681 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
682 * while this is in progress.
684 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
688 callback_flush_data_t *cfd;
690 ci = (cache_item_t *) value;
691 cfd = (callback_flush_data_t *) data;
693 if (ci->flags & CI_FLAGS_IN_QUEUE)
696 if ((ci->last_flush_time <= cfd->abs_timeout)
697 && (ci->values_num > 0))
699 enqueue_cache_item (ci, TAIL);
701 else if ((do_shutdown != 0)
702 && (ci->values_num > 0))
704 enqueue_cache_item (ci, TAIL);
706 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
707 && (ci->values_num <= 0))
711 temp = (char **) realloc (cfd->keys,
712 sizeof (char *) * (cfd->keys_num + 1));
715 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
719 /* Make really sure this points to the _same_ place */
720 assert ((char *) key == ci->file);
721 cfd->keys[cfd->keys_num] = (char *) key;
726 } /* }}} gboolean tree_callback_flush */
728 static int flush_old_values (int max_age)
730 callback_flush_data_t cfd;
733 memset (&cfd, 0, sizeof (cfd));
734 /* Pass the current time as user data so that we don't need to call
735 * `time' for each node. */
736 cfd.now = time (NULL);
741 cfd.abs_timeout = cfd.now - max_age;
743 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
745 /* `tree_callback_flush' will return the keys of all values that haven't
746 * been touched in the last `config_flush_interval' seconds in `cfd'.
747 * The char*'s in this array point to the same memory as ci->file, so we
748 * don't need to free them separately. */
749 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
751 for (k = 0; k < cfd.keys_num; k++)
753 /* should never fail, since we have held the cache_lock
755 assert( forget_file(cfd.keys[k]) == 0 );
758 if (cfd.keys != NULL)
765 } /* int flush_old_values */
767 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
770 struct timespec next_flush;
771 int final_flush = 0; /* make sure we only flush once on shutdown */
773 gettimeofday (&now, NULL);
774 next_flush.tv_sec = now.tv_sec + config_flush_interval;
775 next_flush.tv_nsec = 1000 * now.tv_usec;
777 pthread_mutex_lock (&cache_lock);
778 while ((do_shutdown == 0) || (cache_queue_head != NULL))
787 /* First, check if it's time to do the cache flush. */
788 gettimeofday (&now, NULL);
789 if ((now.tv_sec > next_flush.tv_sec)
790 || ((now.tv_sec == next_flush.tv_sec)
791 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
793 /* Flush all values that haven't been written in the last
794 * `config_write_interval' seconds. */
795 flush_old_values (config_write_interval);
797 /* Determine the time of the next cache flush. */
799 now.tv_sec + next_flush.tv_sec % config_flush_interval;
801 /* unlock the cache while we rotate so we don't block incoming
802 * updates if the fsync() blocks on disk I/O */
803 pthread_mutex_unlock(&cache_lock);
805 pthread_mutex_lock(&cache_lock);
808 /* Now, check if there's something to store away. If not, wait until
809 * something comes in or it's time to do the cache flush. if we are
810 * shutting down, do not wait around. */
811 if (cache_queue_head == NULL && !do_shutdown)
813 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
814 if ((status != 0) && (status != ETIMEDOUT))
816 RRDD_LOG (LOG_ERR, "queue_thread_main: "
817 "pthread_cond_timedwait returned %i.", status);
821 /* We're about to shut down */
822 if (do_shutdown != 0 && !final_flush++)
824 if (config_flush_at_shutdown)
825 flush_old_values (-1); /* flush everything */
830 /* Check if a value has arrived. This may be NULL if we timed out or there
831 * was an interrupt such as a signal. */
832 if (cache_queue_head == NULL)
835 ci = cache_queue_head;
837 /* copy the relevant parts */
838 file = strdup (ci->file);
841 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
845 assert(ci->values != NULL);
846 assert(ci->values_num > 0);
849 values_num = ci->values_num;
851 wipe_ci_values(ci, time(NULL));
852 remove_from_queue(ci);
854 pthread_mutex_lock (&stats_lock);
855 assert (stats_queue_length > 0);
856 stats_queue_length--;
857 pthread_mutex_unlock (&stats_lock);
859 pthread_mutex_unlock (&cache_lock);
862 status = rrd_update_r (file, NULL, values_num, (void *) values);
865 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
866 "rrd_update_r (%s) failed with status %i. (%s)",
867 file, status, rrd_get_error());
870 journal_write("wrote", file);
871 pthread_cond_broadcast(&ci->flushed);
873 for (i = 0; i < values_num; i++)
881 pthread_mutex_lock (&stats_lock);
882 stats_updates_written++;
883 stats_data_sets_written += values_num;
884 pthread_mutex_unlock (&stats_lock);
887 pthread_mutex_lock (&cache_lock);
889 /* We're about to shut down */
890 if (do_shutdown != 0 && !final_flush++)
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
897 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
898 pthread_mutex_unlock (&cache_lock);
900 if (config_flush_at_shutdown)
902 assert(cache_queue_head == NULL);
903 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
909 } /* }}} void *queue_thread_main */
911 static int buffer_get_field (char **buffer_ret, /* {{{ */
912 size_t *buffer_size_ret, char **field_ret)
921 buffer = *buffer_ret;
923 buffer_size = *buffer_size_ret;
927 if (buffer_size <= 0)
930 /* This is ensured by `handle_request'. */
931 assert (buffer[buffer_size - 1] == '\0');
934 while (buffer_pos < buffer_size)
936 /* Check for end-of-field or end-of-buffer */
937 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
939 field[field_size] = 0;
945 /* Handle escaped characters. */
946 else if (buffer[buffer_pos] == '\\')
948 if (buffer_pos >= (buffer_size - 1))
951 field[field_size] = buffer[buffer_pos];
955 /* Normal operation */
958 field[field_size] = buffer[buffer_pos];
962 } /* while (buffer_pos < buffer_size) */
967 *buffer_ret = buffer + buffer_pos;
968 *buffer_size_ret = buffer_size - buffer_pos;
972 } /* }}} int buffer_get_field */
974 /* if we're restricting writes to the base directory,
975 * check whether the file falls within the dir
976 * returns 1 if OK, otherwise 0
978 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
980 assert(file != NULL);
982 if (!config_write_base_only
983 || sock == NULL /* journal replay */
984 || config_base_dir == NULL)
987 if (strstr(file, "../") != NULL) goto err;
989 /* relative paths without "../" are ok */
990 if (*file != '/') return 1;
992 /* file must be of the format base + "/" + <1+ char filename> */
993 if (strlen(file) < _config_base_dir_len + 2) goto err;
994 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
995 if (*(file + _config_base_dir_len) != '/') goto err;
1000 if (sock != NULL && sock->fd >= 0)
1001 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1004 } /* }}} static int check_file_access */
1006 /* when using a base dir, convert relative paths to absolute paths.
1007 * if necessary, modifies the "filename" pointer to point
1008 * to the new path created in "tmp". "tmp" is provided
1009 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1011 * this allows us to optimize for the expected case (absolute path)
1014 static void get_abs_path(char **filename, char *tmp)
1016 assert(tmp != NULL);
1017 assert(filename != NULL && *filename != NULL);
1019 if (config_base_dir == NULL || **filename == '/')
1022 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1024 } /* }}} static int get_abs_path */
1026 /* returns 1 if we have the required privilege level,
1027 * otherwise issue an error to the user on sock */
1028 static int has_privilege (listen_socket_t *sock, /* {{{ */
1029 socket_privilege priv)
1031 if (sock == NULL) /* journal replay */
1034 if (sock->privilege >= priv)
1037 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1038 } /* }}} static int has_privilege */
1040 static int flush_file (const char *filename) /* {{{ */
1044 pthread_mutex_lock (&cache_lock);
1046 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1049 pthread_mutex_unlock (&cache_lock);
1053 if (ci->values_num > 0)
1055 /* Enqueue at head */
1056 enqueue_cache_item (ci, HEAD);
1057 pthread_cond_wait(&ci->flushed, &cache_lock);
1060 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1061 * may have been purged during our cond_wait() */
1063 pthread_mutex_unlock(&cache_lock);
1066 } /* }}} int flush_file */
1068 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1069 char *buffer, size_t buffer_size)
1075 char *help_help[2] =
1077 "Command overview\n"
1079 "HELP [<command>]\n"
1080 "FLUSH <filename>\n"
1082 "PENDING <filename>\n"
1083 "FORGET <filename>\n"
1084 "UPDATE <filename> <values> [<values> ...]\n"
1089 char *help_flush[2] =
1093 "Usage: FLUSH <filename>\n"
1095 "Adds the given filename to the head of the update queue and returns\n"
1096 "after is has been dequeued.\n"
1099 char *help_flushall[2] =
1101 "Help for FLUSHALL\n"
1105 "Triggers writing of all pending updates. Returns immediately.\n"
1108 char *help_pending[2] =
1110 "Help for PENDING\n"
1112 "Usage: PENDING <filename>\n"
1114 "Shows any 'pending' updates for a file, in order.\n"
1115 "The updates shown have not yet been written to the underlying RRD file.\n"
1118 char *help_forget[2] =
1122 "Usage: FORGET <filename>\n"
1124 "Removes the file completely from the cache.\n"
1125 "Any pending updates for the file will be lost.\n"
1128 char *help_update[2] =
1132 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1134 "Adds the given file to the internal cache if it is not yet known and\n"
1135 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1138 "Each <values> has the following form:\n"
1139 " <values> = <time>:<value>[:<value>[...]]\n"
1140 "See the rrdupdate(1) manpage for details.\n"
1143 char *help_stats[2] =
1149 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1150 "a description of the values.\n"
1153 char *help_batch[2] =
1157 "The 'BATCH' command permits the client to initiate a bulk load\n"
1158 " of commands to rrdcached.\n"
1163 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1164 " client: command #1\n"
1165 " client: command #2\n"
1166 " client: ... and so on\n"
1168 " server: 2 errors\n"
1169 " server: 7 message for command #7\n"
1170 " server: 9 message for command #9\n"
1172 "For more information, consult the rrdcached(1) documentation.\n"
1175 status = buffer_get_field (&buffer, &buffer_size, &command);
1177 help_text = help_help;
1180 if (strcasecmp (command, "update") == 0)
1181 help_text = help_update;
1182 else if (strcasecmp (command, "flush") == 0)
1183 help_text = help_flush;
1184 else if (strcasecmp (command, "flushall") == 0)
1185 help_text = help_flushall;
1186 else if (strcasecmp (command, "pending") == 0)
1187 help_text = help_pending;
1188 else if (strcasecmp (command, "forget") == 0)
1189 help_text = help_forget;
1190 else if (strcasecmp (command, "stats") == 0)
1191 help_text = help_stats;
1192 else if (strcasecmp (command, "batch") == 0)
1193 help_text = help_batch;
1195 help_text = help_help;
1198 add_response_info(sock, help_text[1]);
1199 return send_response(sock, RESP_OK, help_text[0]);
1200 } /* }}} int handle_request_help */
1202 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1204 uint64_t copy_queue_length;
1205 uint64_t copy_updates_received;
1206 uint64_t copy_flush_received;
1207 uint64_t copy_updates_written;
1208 uint64_t copy_data_sets_written;
1209 uint64_t copy_journal_bytes;
1210 uint64_t copy_journal_rotate;
1212 uint64_t tree_nodes_number;
1213 uint64_t tree_depth;
1215 pthread_mutex_lock (&stats_lock);
1216 copy_queue_length = stats_queue_length;
1217 copy_updates_received = stats_updates_received;
1218 copy_flush_received = stats_flush_received;
1219 copy_updates_written = stats_updates_written;
1220 copy_data_sets_written = stats_data_sets_written;
1221 copy_journal_bytes = stats_journal_bytes;
1222 copy_journal_rotate = stats_journal_rotate;
1223 pthread_mutex_unlock (&stats_lock);
1225 pthread_mutex_lock (&cache_lock);
1226 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1227 tree_depth = (uint64_t) g_tree_height (cache_tree);
1228 pthread_mutex_unlock (&cache_lock);
1230 add_response_info(sock,
1231 "QueueLength: %"PRIu64"\n", copy_queue_length);
1232 add_response_info(sock,
1233 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1234 add_response_info(sock,
1235 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1236 add_response_info(sock,
1237 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1238 add_response_info(sock,
1239 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1240 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1241 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1242 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1243 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1245 send_response(sock, RESP_OK, "Statistics follow\n");
1248 } /* }}} int handle_request_stats */
1250 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1251 char *buffer, size_t buffer_size)
1253 char *file, file_tmp[PATH_MAX];
1256 status = buffer_get_field (&buffer, &buffer_size, &file);
1259 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1263 pthread_mutex_lock(&stats_lock);
1264 stats_flush_received++;
1265 pthread_mutex_unlock(&stats_lock);
1267 get_abs_path(&file, file_tmp);
1268 if (!check_file_access(file, sock)) return 0;
1270 status = flush_file (file);
1272 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1273 else if (status == ENOENT)
1275 /* no file in our tree; see whether it exists at all */
1276 struct stat statbuf;
1278 memset(&statbuf, 0, sizeof(statbuf));
1279 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1280 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1282 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1284 else if (status < 0)
1285 return send_response(sock, RESP_ERR, "Internal error.\n");
1287 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1292 } /* }}} int handle_request_flush */
1294 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1298 status = has_privilege(sock, PRIV_HIGH);
1302 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1304 pthread_mutex_lock(&cache_lock);
1305 flush_old_values(-1);
1306 pthread_mutex_unlock(&cache_lock);
1308 return send_response(sock, RESP_OK, "Started flush.\n");
1309 } /* }}} static int handle_request_flushall */
1311 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1312 char *buffer, size_t buffer_size)
1315 char *file, file_tmp[PATH_MAX];
1318 status = buffer_get_field(&buffer, &buffer_size, &file);
1320 return send_response(sock, RESP_ERR,
1321 "Usage: PENDING <filename>\n");
1323 status = has_privilege(sock, PRIV_HIGH);
1327 get_abs_path(&file, file_tmp);
1329 pthread_mutex_lock(&cache_lock);
1330 ci = g_tree_lookup(cache_tree, file);
1333 pthread_mutex_unlock(&cache_lock);
1334 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1337 for (int i=0; i < ci->values_num; i++)
1338 add_response_info(sock, "%s\n", ci->values[i]);
1340 pthread_mutex_unlock(&cache_lock);
1341 return send_response(sock, RESP_OK, "updates pending\n");
1342 } /* }}} static int handle_request_pending */
1344 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1345 char *buffer, size_t buffer_size)
1348 char *file, file_tmp[PATH_MAX];
1350 status = buffer_get_field(&buffer, &buffer_size, &file);
1352 return send_response(sock, RESP_ERR,
1353 "Usage: FORGET <filename>\n");
1355 status = has_privilege(sock, PRIV_HIGH);
1359 get_abs_path(&file, file_tmp);
1360 if (!check_file_access(file, sock)) return 0;
1362 pthread_mutex_lock(&cache_lock);
1363 status = forget_file(file);
1364 pthread_mutex_unlock(&cache_lock);
1369 journal_write("forget", file);
1371 return send_response(sock, RESP_OK, "Gone!\n");
1374 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1375 status < 0 ? "Internal error" : rrd_strerror(status));
1379 } /* }}} static int handle_request_forget */
1381 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1383 char *buffer, size_t buffer_size)
1385 char *file, file_tmp[PATH_MAX];
1387 int bad_timestamps = 0;
1389 char orig_buf[CMD_MAX];
1393 status = has_privilege(sock, PRIV_HIGH);
1397 /* save it for the journal later */
1398 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1400 status = buffer_get_field (&buffer, &buffer_size, &file);
1402 return send_response(sock, RESP_ERR,
1403 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1405 pthread_mutex_lock(&stats_lock);
1406 stats_updates_received++;
1407 pthread_mutex_unlock(&stats_lock);
1409 get_abs_path(&file, file_tmp);
1410 if (!check_file_access(file, sock)) return 0;
1412 pthread_mutex_lock (&cache_lock);
1413 ci = g_tree_lookup (cache_tree, file);
1415 if (ci == NULL) /* {{{ */
1417 struct stat statbuf;
1419 /* don't hold the lock while we setup; stat(2) might block */
1420 pthread_mutex_unlock(&cache_lock);
1422 memset (&statbuf, 0, sizeof (statbuf));
1423 status = stat (file, &statbuf);
1426 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1429 if (status == ENOENT)
1430 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1432 return send_response(sock, RESP_ERR,
1433 "stat failed with error %i.\n", status);
1435 if (!S_ISREG (statbuf.st_mode))
1436 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1438 if (access(file, R_OK|W_OK) != 0)
1439 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1440 file, rrd_strerror(errno));
1442 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1445 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1447 return send_response(sock, RESP_ERR, "malloc failed.\n");
1449 memset (ci, 0, sizeof (cache_item_t));
1451 ci->file = strdup (file);
1452 if (ci->file == NULL)
1455 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1457 return send_response(sock, RESP_ERR, "strdup failed.\n");
1460 wipe_ci_values(ci, now);
1461 ci->flags = CI_FLAGS_IN_TREE;
1462 pthread_cond_init(&ci->flushed, NULL);
1464 pthread_mutex_lock(&cache_lock);
1465 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1467 assert (ci != NULL);
1469 /* don't re-write updates in replay mode */
1471 journal_write("update", orig_buf);
1473 while (buffer_size > 0)
1480 status = buffer_get_field (&buffer, &buffer_size, &value);
1483 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1487 /* make sure update time is always moving forward */
1488 stamp = strtol(value, &eostamp, 10);
1489 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1492 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1495 else if (stamp <= ci->last_update_stamp)
1498 add_response_info(sock,
1499 "illegal attempt to update using time %ld when"
1500 " last update time is %ld (minimum one second step)\n",
1501 stamp, ci->last_update_stamp);
1505 ci->last_update_stamp = stamp;
1507 temp = (char **) realloc (ci->values,
1508 sizeof (char *) * (ci->values_num + 1));
1511 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1516 ci->values[ci->values_num] = strdup (value);
1517 if (ci->values[ci->values_num] == NULL)
1519 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1527 if (((now - ci->last_flush_time) >= config_write_interval)
1528 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1529 && (ci->values_num > 0))
1531 enqueue_cache_item (ci, TAIL);
1534 pthread_mutex_unlock (&cache_lock);
1538 /* journal replay mode */
1539 if (sock == NULL) return RESP_ERR;
1541 /* if we had only one update attempt, then return the full
1542 error message... try to get the most information out
1543 of the limited error space allowed by the protocol
1545 if (bad_timestamps == 1)
1546 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1548 return send_response(sock, RESP_ERR,
1549 "No values updated (%d bad timestamps).\n",
1553 return send_response(sock, RESP_OK,
1554 "errors, enqueued %i value(s).\n", values_num);
1559 } /* }}} int handle_request_update */
1561 /* we came across a "WROTE" entry during journal replay.
1562 * throw away any values that we have accumulated for this file
1564 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1568 const char *file = buffer;
1570 pthread_mutex_lock(&cache_lock);
1572 ci = g_tree_lookup(cache_tree, file);
1575 pthread_mutex_unlock(&cache_lock);
1581 for (i=0; i < ci->values_num; i++)
1582 free(ci->values[i]);
1587 wipe_ci_values(ci, now);
1588 remove_from_queue(ci);
1590 pthread_mutex_unlock(&cache_lock);
1592 } /* }}} int handle_request_wrote */
1594 /* start "BATCH" processing */
1595 static int batch_start (listen_socket_t *sock) /* {{{ */
1598 if (sock->batch_start)
1599 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1601 status = send_response(sock, RESP_OK,
1602 "Go ahead. End with dot '.' on its own line.\n");
1603 sock->batch_start = time(NULL);
1604 sock->batch_cmd = 0;
1607 } /* }}} static int batch_start */
1609 /* finish "BATCH" processing and return results to the client */
1610 static int batch_done (listen_socket_t *sock) /* {{{ */
1612 assert(sock->batch_start);
1613 sock->batch_start = 0;
1614 sock->batch_cmd = 0;
1615 return send_response(sock, RESP_OK, "errors\n");
1616 } /* }}} static int batch_done */
1618 /* if sock==NULL, we are in journal replay mode */
1619 static int handle_request (listen_socket_t *sock, /* {{{ */
1621 char *buffer, size_t buffer_size)
1627 assert (buffer[buffer_size - 1] == '\0');
1629 buffer_ptr = buffer;
1631 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1634 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1638 if (sock != NULL && sock->batch_start)
1641 if (strcasecmp (command, "update") == 0)
1642 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1643 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1645 /* this is only valid in replay mode */
1646 return (handle_request_wrote (buffer_ptr, now));
1648 else if (strcasecmp (command, "flush") == 0)
1649 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1650 else if (strcasecmp (command, "flushall") == 0)
1651 return (handle_request_flushall(sock));
1652 else if (strcasecmp (command, "pending") == 0)
1653 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1654 else if (strcasecmp (command, "forget") == 0)
1655 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1656 else if (strcasecmp (command, "stats") == 0)
1657 return (handle_request_stats (sock));
1658 else if (strcasecmp (command, "help") == 0)
1659 return (handle_request_help (sock, buffer_ptr, buffer_size));
1660 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1661 return batch_start(sock);
1662 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1663 return batch_done(sock);
1665 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1669 } /* }}} int handle_request */
1671 /* MUST NOT hold journal_lock before calling this */
1672 static void journal_rotate(void) /* {{{ */
1674 FILE *old_fh = NULL;
1677 if (journal_cur == NULL || journal_old == NULL)
1680 pthread_mutex_lock(&journal_lock);
1682 /* we rotate this way (rename before close) so that the we can release
1683 * the journal lock as fast as possible. Journal writes to the new
1684 * journal can proceed immediately after the new file is opened. The
1685 * fclose can then block without affecting new updates.
1687 if (journal_fh != NULL)
1689 old_fh = journal_fh;
1691 rename(journal_cur, journal_old);
1692 ++stats_journal_rotate;
1695 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1696 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1699 journal_fh = fdopen(new_fd, "a");
1700 if (journal_fh == NULL)
1704 pthread_mutex_unlock(&journal_lock);
1709 if (journal_fh == NULL)
1712 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1713 journal_cur, rrd_strerror(errno));
1716 "JOURNALING DISABLED: All values will be flushed at shutdown");
1717 config_flush_at_shutdown = 1;
1720 } /* }}} static void journal_rotate */
1722 static void journal_done(void) /* {{{ */
1724 if (journal_cur == NULL)
1727 pthread_mutex_lock(&journal_lock);
1728 if (journal_fh != NULL)
1734 if (config_flush_at_shutdown)
1736 RRDD_LOG(LOG_INFO, "removing journals");
1737 unlink(journal_old);
1738 unlink(journal_cur);
1742 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1743 "journals will be used at next startup");
1746 pthread_mutex_unlock(&journal_lock);
1748 } /* }}} static void journal_done */
1750 static int journal_write(char *cmd, char *args) /* {{{ */
1754 if (journal_fh == NULL)
1757 pthread_mutex_lock(&journal_lock);
1758 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1759 pthread_mutex_unlock(&journal_lock);
1763 pthread_mutex_lock(&stats_lock);
1764 stats_journal_bytes += chars;
1765 pthread_mutex_unlock(&stats_lock);
1769 } /* }}} static int journal_write */
1771 static int journal_replay (const char *file) /* {{{ */
1777 char entry[CMD_MAX];
1780 if (file == NULL) return 0;
1785 struct stat statbuf;
1787 memset(&statbuf, 0, sizeof(statbuf));
1788 if (stat(file, &statbuf) != 0)
1790 if (errno == ENOENT)
1793 reason = "stat error";
1796 else if (!S_ISREG(statbuf.st_mode))
1798 reason = "not a regular file";
1801 if (statbuf.st_uid != daemon_uid)
1803 reason = "not owned by daemon user";
1806 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1808 reason = "must not be user/group writable";
1814 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1815 file, rrd_strerror(status), reason);
1820 fh = fopen(file, "r");
1823 if (errno != ENOENT)
1824 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1825 file, rrd_strerror(errno));
1829 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1838 if (fgets(entry, sizeof(entry), fh) == NULL)
1840 entry_len = strlen(entry);
1842 /* check \n termination in case journal writing crashed mid-line */
1845 else if (entry[entry_len - 1] != '\n')
1847 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1852 entry[entry_len - 1] = '\0';
1854 if (handle_request(NULL, now, entry, entry_len) == 0)
1862 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1863 entry_cnt, fail_cnt);
1865 return entry_cnt > 0 ? 1 : 0;
1866 } /* }}} static int journal_replay */
1868 static void journal_init(void) /* {{{ */
1870 int had_journal = 0;
1872 if (journal_cur == NULL) return;
1874 pthread_mutex_lock(&journal_lock);
1876 RRDD_LOG(LOG_INFO, "checking for journal files");
1878 had_journal += journal_replay(journal_old);
1879 had_journal += journal_replay(journal_cur);
1881 /* it must have been a crash. start a flush */
1882 if (had_journal && config_flush_at_shutdown)
1883 flush_old_values(-1);
1885 pthread_mutex_unlock(&journal_lock);
1888 RRDD_LOG(LOG_INFO, "journal processing complete");
1890 } /* }}} static void journal_init */
1892 static void close_connection(listen_socket_t *sock)
1894 close(sock->fd) ; sock->fd = -1;
1895 free(sock->rbuf); sock->rbuf = NULL;
1896 free(sock->wbuf); sock->wbuf = NULL;
1901 static void *connection_thread_main (void *args) /* {{{ */
1904 listen_socket_t *sock;
1908 sock = (listen_socket_t *) args;
1911 /* init read buffers */
1912 sock->next_read = sock->next_cmd = 0;
1913 sock->rbuf = malloc(RBUF_SIZE);
1914 if (sock->rbuf == NULL)
1916 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1917 close_connection(sock);
1921 pthread_mutex_lock (&connection_threads_lock);
1925 temp = (pthread_t *) realloc (connection_threads,
1926 sizeof (pthread_t) * (connection_threads_num + 1));
1929 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1933 connection_threads = temp;
1934 connection_threads[connection_threads_num] = pthread_self ();
1935 connection_threads_num++;
1938 pthread_mutex_unlock (&connection_threads_lock);
1940 while (do_shutdown == 0)
1947 struct pollfd pollfd;
1951 pollfd.events = POLLIN | POLLPRI;
1954 status = poll (&pollfd, 1, /* timeout = */ 500);
1957 else if (status == 0) /* timeout */
1959 else if (status < 0) /* error */
1962 if (status != EINTR)
1963 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1967 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1969 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1971 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1972 "poll(2) returned something unexpected: %#04hx",
1977 rbytes = read(fd, sock->rbuf + sock->next_read,
1978 RBUF_SIZE - sock->next_read);
1981 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1984 else if (rbytes == 0)
1987 sock->next_read += rbytes;
1989 if (sock->batch_start)
1990 now = sock->batch_start;
1994 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1996 status = handle_request (sock, now, cmd, cmd_len+1);
2003 close_connection(sock);
2005 self = pthread_self ();
2006 /* Remove this thread from the connection threads list */
2007 pthread_mutex_lock (&connection_threads_lock);
2008 /* Find out own index in the array */
2009 for (i = 0; i < connection_threads_num; i++)
2010 if (pthread_equal (connection_threads[i], self) != 0)
2012 assert (i < connection_threads_num);
2014 /* Move the trailing threads forward. */
2015 if (i < (connection_threads_num - 1))
2017 memmove (connection_threads + i,
2018 connection_threads + i + 1,
2019 sizeof (pthread_t) * (connection_threads_num - i - 1));
2022 connection_threads_num--;
2023 pthread_mutex_unlock (&connection_threads_lock);
2026 } /* }}} void *connection_thread_main */
2028 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2031 struct sockaddr_un sa;
2032 listen_socket_t *temp;
2037 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2038 path += strlen("unix:");
2040 temp = (listen_socket_t *) realloc (listen_fds,
2041 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2044 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2048 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2050 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2053 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2054 rrd_strerror(errno));
2058 memset (&sa, 0, sizeof (sa));
2059 sa.sun_family = AF_UNIX;
2060 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2062 /* if we've gotten this far, we own the pid file. any daemon started
2063 * with the same args must not be alive. therefore, ensure that we can
2064 * create the socket...
2068 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2071 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2072 path, rrd_strerror(errno));
2077 status = listen (fd, /* backlog = */ 10);
2080 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2081 path, rrd_strerror(errno));
2087 listen_fds[listen_fds_num].fd = fd;
2088 listen_fds[listen_fds_num].family = PF_UNIX;
2089 strncpy(listen_fds[listen_fds_num].addr, path,
2090 sizeof (listen_fds[listen_fds_num].addr) - 1);
2094 } /* }}} int open_listen_socket_unix */
2096 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2098 struct addrinfo ai_hints;
2099 struct addrinfo *ai_res;
2100 struct addrinfo *ai_ptr;
2101 char addr_copy[NI_MAXHOST];
2106 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2107 addr_copy[sizeof (addr_copy) - 1] = 0;
2110 memset (&ai_hints, 0, sizeof (ai_hints));
2111 ai_hints.ai_flags = 0;
2112 #ifdef AI_ADDRCONFIG
2113 ai_hints.ai_flags |= AI_ADDRCONFIG;
2115 ai_hints.ai_family = AF_UNSPEC;
2116 ai_hints.ai_socktype = SOCK_STREAM;
2119 if (*addr == '[') /* IPv6+port format */
2121 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2124 port = strchr (addr, ']');
2127 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2135 else if (*port == 0)
2139 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2142 } /* if (*addr = ']') */
2143 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2145 port = rindex(addr, ':');
2153 status = getaddrinfo (addr,
2154 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2155 &ai_hints, &ai_res);
2158 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2159 addr, gai_strerror (status));
2163 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2166 listen_socket_t *temp;
2169 temp = (listen_socket_t *) realloc (listen_fds,
2170 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2174 "rrdcached: open_listen_socket_network: realloc failed.\n");
2178 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2180 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2183 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2184 rrd_strerror(errno));
2188 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2190 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2193 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2194 sock->addr, rrd_strerror(errno));
2199 status = listen (fd, /* backlog = */ 10);
2202 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2203 sock->addr, rrd_strerror(errno));
2208 listen_fds[listen_fds_num].fd = fd;
2209 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2211 } /* for (ai_ptr) */
2214 } /* }}} static int open_listen_socket_network */
2216 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2218 assert(sock != NULL);
2219 assert(sock->addr != NULL);
2221 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2222 || sock->addr[0] == '/')
2223 return (open_listen_socket_unix(sock));
2225 return (open_listen_socket_network(sock));
2226 } /* }}} int open_listen_socket */
2228 static int close_listen_sockets (void) /* {{{ */
2232 for (i = 0; i < listen_fds_num; i++)
2234 close (listen_fds[i].fd);
2236 if (listen_fds[i].family == PF_UNIX)
2237 unlink(listen_fds[i].addr);
2245 } /* }}} int close_listen_sockets */
2247 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2249 struct pollfd *pollfds;
2254 if (listen_fds_num < 1)
2256 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2260 pollfds_num = listen_fds_num;
2261 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2262 if (pollfds == NULL)
2264 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2267 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2269 RRDD_LOG(LOG_INFO, "listening for connections");
2271 while (do_shutdown == 0)
2273 assert (pollfds_num == ((int) listen_fds_num));
2274 for (i = 0; i < pollfds_num; i++)
2276 pollfds[i].fd = listen_fds[i].fd;
2277 pollfds[i].events = POLLIN | POLLPRI;
2278 pollfds[i].revents = 0;
2281 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2284 else if (status == 0) /* timeout */
2286 else if (status < 0) /* error */
2289 if (status != EINTR)
2291 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2296 for (i = 0; i < pollfds_num; i++)
2298 listen_socket_t *client_sock;
2299 struct sockaddr_storage client_sa;
2300 socklen_t client_sa_size;
2302 pthread_attr_t attr;
2304 if (pollfds[i].revents == 0)
2307 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2309 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2310 "poll(2) returned something unexpected for listen FD #%i.",
2315 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2316 if (client_sock == NULL)
2318 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2321 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2323 client_sa_size = sizeof (client_sa);
2324 client_sock->fd = accept (pollfds[i].fd,
2325 (struct sockaddr *) &client_sa, &client_sa_size);
2326 if (client_sock->fd < 0)
2328 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2333 pthread_attr_init (&attr);
2334 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2336 status = pthread_create (&tid, &attr, connection_thread_main,
2340 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2341 close_connection(client_sock);
2344 } /* for (pollfds_num) */
2345 } /* while (do_shutdown == 0) */
2347 RRDD_LOG(LOG_INFO, "starting shutdown");
2349 close_listen_sockets ();
2351 pthread_mutex_lock (&connection_threads_lock);
2352 while (connection_threads_num > 0)
2356 wait_for = connection_threads[0];
2358 pthread_mutex_unlock (&connection_threads_lock);
2359 pthread_join (wait_for, /* retval = */ NULL);
2360 pthread_mutex_lock (&connection_threads_lock);
2362 pthread_mutex_unlock (&connection_threads_lock);
2365 } /* }}} void *listen_thread_main */
2367 static int daemonize (void) /* {{{ */
2372 daemon_uid = geteuid();
2374 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2376 pid_fd = check_pidfile();
2380 /* open all the listen sockets */
2381 if (config_listen_address_list_len > 0)
2383 for (int i = 0; i < config_listen_address_list_len; i++)
2384 open_listen_socket (config_listen_address_list[i]);
2388 listen_socket_t sock;
2389 memset(&sock, 0, sizeof(sock));
2390 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2391 open_listen_socket (&sock);
2394 if (listen_fds_num < 1)
2396 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2400 if (!stay_foreground)
2407 fprintf (stderr, "daemonize: fork(2) failed.\n");
2413 /* Become session leader */
2416 /* Open the first three file descriptors to /dev/null */
2421 open ("/dev/null", O_RDWR);
2424 } /* if (!stay_foreground) */
2426 /* Change into the /tmp directory. */
2427 base_dir = (config_base_dir != NULL)
2431 if (chdir (base_dir) != 0)
2433 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2437 install_signal_handlers();
2439 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2440 RRDD_LOG(LOG_INFO, "starting up");
2442 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2443 if (cache_tree == NULL)
2445 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2449 return write_pidfile (pid_fd);
2454 } /* }}} int daemonize */
2456 static int cleanup (void) /* {{{ */
2460 pthread_cond_signal (&cache_cond);
2461 pthread_join (queue_thread, /* return = */ NULL);
2465 RRDD_LOG(LOG_INFO, "goodbye");
2469 } /* }}} int cleanup */
2471 static int read_options (int argc, char **argv) /* {{{ */
2476 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2487 listen_socket_t **temp;
2488 listen_socket_t *new;
2490 new = malloc(sizeof(listen_socket_t));
2493 fprintf(stderr, "read_options: malloc failed.\n");
2496 memset(new, 0, sizeof(listen_socket_t));
2498 temp = (listen_socket_t **) realloc (config_listen_address_list,
2499 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2502 fprintf (stderr, "read_options: realloc failed.\n");
2505 config_listen_address_list = temp;
2507 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2508 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2510 temp[config_listen_address_list_len] = new;
2511 config_listen_address_list_len++;
2519 temp = atoi (optarg);
2521 config_flush_interval = temp;
2524 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2534 temp = atoi (optarg);
2536 config_write_interval = temp;
2539 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2549 temp = atoi(optarg);
2551 config_write_jitter = temp;
2554 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2562 config_write_base_only = 1;
2568 char base_realpath[PATH_MAX];
2570 if (config_base_dir != NULL)
2571 free (config_base_dir);
2572 config_base_dir = strdup (optarg);
2573 if (config_base_dir == NULL)
2575 fprintf (stderr, "read_options: strdup failed.\n");
2579 /* make sure that the base directory is not resolved via
2580 * symbolic links. this makes some performance-enhancing
2581 * assumptions possible (we don't have to resolve paths
2582 * that start with a "/")
2584 if (realpath(config_base_dir, base_realpath) == NULL)
2586 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2589 else if (strncmp(config_base_dir,
2590 base_realpath, sizeof(base_realpath)) != 0)
2593 "Base directory (-b) resolved via file system links!\n"
2594 "Please consult rrdcached '-b' documentation!\n"
2595 "Consider specifying the real directory (%s)\n",
2600 len = strlen (config_base_dir);
2601 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2603 config_base_dir[len - 1] = 0;
2609 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2613 _config_base_dir_len = len;
2619 if (config_pid_file != NULL)
2620 free (config_pid_file);
2621 config_pid_file = strdup (optarg);
2622 if (config_pid_file == NULL)
2624 fprintf (stderr, "read_options: strdup failed.\n");
2631 config_flush_at_shutdown = 1;
2636 struct stat statbuf;
2637 const char *dir = optarg;
2639 status = stat(dir, &statbuf);
2642 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2646 if (!S_ISDIR(statbuf.st_mode)
2647 || access(dir, R_OK|W_OK|X_OK) != 0)
2649 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2650 errno ? rrd_strerror(errno) : "");
2654 journal_cur = malloc(PATH_MAX + 1);
2655 journal_old = malloc(PATH_MAX + 1);
2656 if (journal_cur == NULL || journal_old == NULL)
2658 fprintf(stderr, "malloc failure for journal files\n");
2663 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2664 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2671 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2673 "Usage: rrdcached [options]\n"
2675 "Valid options are:\n"
2676 " -l <address> Socket address to listen to.\n"
2677 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2678 " -w <seconds> Interval in which to write data.\n"
2679 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2680 " -f <seconds> Interval in which to flush dead data.\n"
2681 " -p <file> Location of the PID-file.\n"
2682 " -b <dir> Base directory to change to.\n"
2683 " -B Restrict file access to paths within -b <dir>\n"
2684 " -g Do not fork and run in the foreground.\n"
2685 " -j <dir> Directory in which to create the journal files.\n"
2686 " -F Always flush all updates at shutdown\n"
2688 "For more information and a detailed description of all options "
2690 "to the rrdcached(1) manual page.\n",
2694 } /* switch (option) */
2695 } /* while (getopt) */
2697 /* advise the user when values are not sane */
2698 if (config_flush_interval < 2 * config_write_interval)
2699 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2700 " 2x write interval (-w) !\n");
2701 if (config_write_jitter > config_write_interval)
2702 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2703 " write interval (-w) !\n");
2705 if (config_write_base_only && config_base_dir == NULL)
2706 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2707 " Consult the rrdcached documentation\n");
2709 if (journal_cur == NULL)
2710 config_flush_at_shutdown = 1;
2713 } /* }}} int read_options */
2715 int main (int argc, char **argv)
2719 status = read_options (argc, argv);
2727 status = daemonize ();
2730 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2736 /* start the queue thread */
2737 memset (&queue_thread, 0, sizeof (queue_thread));
2738 status = pthread_create (&queue_thread,
2744 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2749 listen_thread_main (NULL);
2756 * vim: set sw=2 sts=2 ts=8 et fdm=marker :