2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
77 #include <sys/types.h>
81 #include <sys/socket.h>
92 #include <glib-2.0/glib.h>
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
98 # define __attribute__(x) /**/
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
115 char addr[PATH_MAX + 1];
117 socket_privilege privilege;
119 /* state for BATCH processing */
131 typedef struct listen_socket_s listen_socket_t;
134 typedef struct cache_item_s cache_item_t;
140 time_t last_flush_time;
141 time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
145 pthread_cond_t flushed;
150 struct callback_flush_data_s
157 typedef struct callback_flush_data_s callback_flush_data_t;
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
168 #define RBUF_SIZE (CMD_MAX*2)
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
188 static GTree *cache_tree = NULL;
189 static cache_item_t *cache_queue_head = NULL;
190 static cache_item_t *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
227 static void sig_common (const char *sig) /* {{{ */
229 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
231 pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246 config_flush_at_shutdown = 1;
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252 config_flush_at_shutdown = 0;
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258 /* These structures are static, because `sigaction' behaves weird if the are
260 static struct sigaction sa_int;
261 static struct sigaction sa_term;
262 static struct sigaction sa_pipe;
263 static struct sigaction sa_usr1;
264 static struct sigaction sa_usr2;
266 /* Install signal handlers */
267 memset (&sa_int, 0, sizeof (sa_int));
268 sa_int.sa_handler = sig_int_handler;
269 sigaction (SIGINT, &sa_int, NULL);
271 memset (&sa_term, 0, sizeof (sa_term));
272 sa_term.sa_handler = sig_term_handler;
273 sigaction (SIGTERM, &sa_term, NULL);
275 memset (&sa_pipe, 0, sizeof (sa_pipe));
276 sa_pipe.sa_handler = SIG_IGN;
277 sigaction (SIGPIPE, &sa_pipe, NULL);
279 memset (&sa_pipe, 0, sizeof (sa_usr1));
280 sa_usr1.sa_handler = sig_usr1_handler;
281 sigaction (SIGUSR1, &sa_usr1, NULL);
283 memset (&sa_usr2, 0, sizeof (sa_usr2));
284 sa_usr2.sa_handler = sig_usr2_handler;
285 sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
294 file = (config_pid_file != NULL)
296 : LOCALSTATEDIR "/run/rrdcached.pid";
298 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
300 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301 file, rrd_strerror(errno));
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
313 fh = fdopen (fd, "w");
316 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
321 fprintf (fh, "%i\n", (int) pid);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
332 file = (config_pid_file != NULL)
334 : LOCALSTATEDIR "/run/rrdcached.pid";
336 status = unlink (file);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
346 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347 sock->next_read - sock->next_cmd);
351 /* no commands left, move remainder back to front of rbuf */
352 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353 sock->next_read - sock->next_cmd);
354 sock->next_read -= sock->next_cmd;
361 char *cmd = sock->rbuf + sock->next_cmd;
364 sock->next_cmd = eol - sock->rbuf + 1;
366 if (eol > sock->rbuf && *(eol-1) == '\r')
367 *(--eol) = '\0'; /* handle "\r\n" EOL */
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
383 assert(sock != NULL);
385 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
388 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
392 strncpy(new_buf + sock->wbuf_len, str, len + 1);
394 sock->wbuf = new_buf;
395 sock->wbuf_len += len;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
404 char buffer[CMD_MAX];
407 if (sock == NULL) return 0; /* journal replay mode */
408 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
411 #ifdef HAVE_VSNPRINTF
412 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
414 len = vsprintf(buffer, fmt, argp);
419 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
423 return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
432 while ((str = strchr(str, '\n')) != NULL)
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443 * returns 0 on success, -1 on error
444 * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446 char *fmt, ...) /* {{{ */
449 char buffer[CMD_MAX];
454 if (sock == NULL) return rc; /* journal replay mode */
456 if (sock->batch_mode)
459 return rc; /* no response on success during BATCH */
460 lines = sock->batch_cmd;
462 else if (rc == RESP_OK)
463 lines = count_lines(sock->wbuf);
467 rclen = sprintf(buffer, "%d ", lines);
469 #ifdef HAVE_VSNPRINTF
470 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
472 len = vsprintf(buffer+rclen, fmt, argp);
480 /* append the result to the wbuf, don't write to the user */
481 if (sock->batch_mode)
482 return add_to_wbuf(sock, buffer, len);
484 /* first write must be complete */
485 if (len != write(sock->fd, buffer, len))
487 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
491 if (sock->wbuf != NULL)
494 while (wrote < sock->wbuf_len)
496 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
499 RRDD_LOG(LOG_INFO, "send_response: could not write results");
506 free(sock->wbuf); sock->wbuf = NULL;
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
517 ci->last_flush_time = when;
518 if (config_write_jitter > 0)
519 ci->last_flush_time += (random() % config_write_jitter);
523 * remove a "cache_item_t" item from the queue.
524 * must hold 'cache_lock' when calling this
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
528 if (ci == NULL) return;
530 if (ci->prev == NULL)
531 cache_queue_head = ci->next; /* reset head */
533 ci->prev->next = ci->next;
535 if (ci->next == NULL)
536 cache_queue_tail = ci->prev; /* reset the tail */
538 ci->next->prev = ci->prev;
540 ci->next = ci->prev = NULL;
541 ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545 * must hold 'cache lock' while calling this.
546 * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
551 ci = g_tree_lookup(cache_tree, file);
555 g_tree_remove (cache_tree, file);
556 remove_from_queue(ci);
558 for (int i=0; i < ci->values_num; i++)
564 /* in case anyone is waiting */
565 pthread_cond_broadcast(&ci->flushed);
570 } /* }}} static int forget_file */
573 * enqueue_cache_item:
574 * `cache_lock' must be acquired before calling this function!
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
582 if (ci->values_num == 0)
587 if (cache_queue_head == ci)
590 /* remove from the double linked list */
591 if (ci->flags & CI_FLAGS_IN_QUEUE)
592 remove_from_queue(ci);
595 ci->next = cache_queue_head;
596 if (ci->next != NULL)
598 cache_queue_head = ci;
600 if (cache_queue_tail == NULL)
601 cache_queue_tail = cache_queue_head;
603 else /* (side == TAIL) */
605 /* We don't move values back in the list.. */
606 if (ci->flags & CI_FLAGS_IN_QUEUE)
609 assert (ci->next == NULL);
610 assert (ci->prev == NULL);
612 ci->prev = cache_queue_tail;
614 if (cache_queue_tail == NULL)
615 cache_queue_head = ci;
617 cache_queue_tail->next = ci;
619 cache_queue_tail = ci;
622 ci->flags |= CI_FLAGS_IN_QUEUE;
624 pthread_cond_broadcast(&cache_cond);
625 pthread_mutex_lock (&stats_lock);
626 stats_queue_length++;
627 pthread_mutex_unlock (&stats_lock);
630 } /* }}} int enqueue_cache_item */
633 * tree_callback_flush:
634 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635 * while this is in progress.
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
641 callback_flush_data_t *cfd;
643 ci = (cache_item_t *) value;
644 cfd = (callback_flush_data_t *) data;
646 if ((ci->last_flush_time <= cfd->abs_timeout)
647 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648 && (ci->values_num > 0))
650 enqueue_cache_item (ci, TAIL);
652 else if ((do_shutdown != 0)
653 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654 && (ci->values_num > 0))
656 enqueue_cache_item (ci, TAIL);
658 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660 && (ci->values_num <= 0))
664 temp = (char **) realloc (cfd->keys,
665 sizeof (char *) * (cfd->keys_num + 1));
668 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
672 /* Make really sure this points to the _same_ place */
673 assert ((char *) key == ci->file);
674 cfd->keys[cfd->keys_num] = (char *) key;
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
683 callback_flush_data_t cfd;
686 memset (&cfd, 0, sizeof (cfd));
687 /* Pass the current time as user data so that we don't need to call
688 * `time' for each node. */
689 cfd.now = time (NULL);
694 cfd.abs_timeout = cfd.now - max_age;
696 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698 /* `tree_callback_flush' will return the keys of all values that haven't
699 * been touched in the last `config_flush_interval' seconds in `cfd'.
700 * The char*'s in this array point to the same memory as ci->file, so we
701 * don't need to free them separately. */
702 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704 for (k = 0; k < cfd.keys_num; k++)
706 /* should never fail, since we have held the cache_lock
708 assert( forget_file(cfd.keys[k]) == 0 );
711 if (cfd.keys != NULL)
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
723 struct timespec next_flush;
724 int final_flush = 0; /* make sure we only flush once on shutdown */
726 gettimeofday (&now, NULL);
727 next_flush.tv_sec = now.tv_sec + config_flush_interval;
728 next_flush.tv_nsec = 1000 * now.tv_usec;
730 pthread_mutex_lock (&cache_lock);
731 while ((do_shutdown == 0) || (cache_queue_head != NULL))
740 /* First, check if it's time to do the cache flush. */
741 gettimeofday (&now, NULL);
742 if ((now.tv_sec > next_flush.tv_sec)
743 || ((now.tv_sec == next_flush.tv_sec)
744 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
746 /* Flush all values that haven't been written in the last
747 * `config_write_interval' seconds. */
748 flush_old_values (config_write_interval);
750 /* Determine the time of the next cache flush. */
751 while (next_flush.tv_sec <= now.tv_sec)
752 next_flush.tv_sec += config_flush_interval;
754 /* unlock the cache while we rotate so we don't block incoming
755 * updates if the fsync() blocks on disk I/O */
756 pthread_mutex_unlock(&cache_lock);
758 pthread_mutex_lock(&cache_lock);
761 /* Now, check if there's something to store away. If not, wait until
762 * something comes in or it's time to do the cache flush. if we are
763 * shutting down, do not wait around. */
764 if (cache_queue_head == NULL && !do_shutdown)
766 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767 if ((status != 0) && (status != ETIMEDOUT))
769 RRDD_LOG (LOG_ERR, "queue_thread_main: "
770 "pthread_cond_timedwait returned %i.", status);
774 /* We're about to shut down */
775 if (do_shutdown != 0 && !final_flush++)
777 if (config_flush_at_shutdown)
778 flush_old_values (-1); /* flush everything */
783 /* Check if a value has arrived. This may be NULL if we timed out or there
784 * was an interrupt such as a signal. */
785 if (cache_queue_head == NULL)
788 ci = cache_queue_head;
790 /* copy the relevant parts */
791 file = strdup (ci->file);
794 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
798 assert(ci->values != NULL);
799 assert(ci->values_num > 0);
802 values_num = ci->values_num;
804 wipe_ci_values(ci, time(NULL));
805 remove_from_queue(ci);
807 pthread_mutex_lock (&stats_lock);
808 assert (stats_queue_length > 0);
809 stats_queue_length--;
810 pthread_mutex_unlock (&stats_lock);
812 pthread_mutex_unlock (&cache_lock);
815 status = rrd_update_r (file, NULL, values_num, (void *) values);
818 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819 "rrd_update_r (%s) failed with status %i. (%s)",
820 file, status, rrd_get_error());
823 journal_write("wrote", file);
824 pthread_cond_broadcast(&ci->flushed);
826 for (i = 0; i < values_num; i++)
834 pthread_mutex_lock (&stats_lock);
835 stats_updates_written++;
836 stats_data_sets_written += values_num;
837 pthread_mutex_unlock (&stats_lock);
840 pthread_mutex_lock (&cache_lock);
842 /* We're about to shut down */
843 if (do_shutdown != 0 && !final_flush++)
845 if (config_flush_at_shutdown)
846 flush_old_values (-1); /* flush everything */
850 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851 pthread_mutex_unlock (&cache_lock);
853 if (config_flush_at_shutdown)
855 assert(cache_queue_head == NULL);
856 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865 size_t *buffer_size_ret, char **field_ret)
874 buffer = *buffer_ret;
876 buffer_size = *buffer_size_ret;
880 if (buffer_size <= 0)
883 /* This is ensured by `handle_request'. */
884 assert (buffer[buffer_size - 1] == '\0');
887 while (buffer_pos < buffer_size)
889 /* Check for end-of-field or end-of-buffer */
890 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
892 field[field_size] = 0;
898 /* Handle escaped characters. */
899 else if (buffer[buffer_pos] == '\\')
901 if (buffer_pos >= (buffer_size - 1))
904 field[field_size] = buffer[buffer_pos];
908 /* Normal operation */
911 field[field_size] = buffer[buffer_pos];
915 } /* while (buffer_pos < buffer_size) */
920 *buffer_ret = buffer + buffer_pos;
921 *buffer_size_ret = buffer_size - buffer_pos;
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928 * check whether the file falls within the dir
929 * returns 1 if OK, otherwise 0
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
933 assert(file != NULL);
935 if (!config_write_base_only
936 || sock == NULL /* journal replay */
937 || config_base_dir == NULL)
940 if (strstr(file, "../") != NULL) goto err;
942 /* relative paths without "../" are ok */
943 if (*file != '/') return 1;
945 /* file must be of the format base + "/" + <1+ char filename> */
946 if (strlen(file) < _config_base_dir_len + 2) goto err;
947 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948 if (*(file + _config_base_dir_len) != '/') goto err;
953 if (sock != NULL && sock->fd >= 0)
954 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
957 } /* }}} static int check_file_access */
959 /* returns 1 if we have the required privilege level,
960 * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962 socket_privilege priv)
964 if (sock == NULL) /* journal replay */
967 if (sock->privilege >= priv)
970 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
973 static int flush_file (const char *filename) /* {{{ */
977 pthread_mutex_lock (&cache_lock);
979 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
982 pthread_mutex_unlock (&cache_lock);
986 if (ci->values_num > 0)
988 /* Enqueue at head */
989 enqueue_cache_item (ci, HEAD);
990 pthread_cond_wait(&ci->flushed, &cache_lock);
993 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
994 * may have been purged during our cond_wait() */
996 pthread_mutex_unlock(&cache_lock);
999 } /* }}} int flush_file */
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002 char *buffer, size_t buffer_size)
1008 char *help_help[2] =
1010 "Command overview\n"
1012 "HELP [<command>]\n"
1013 "FLUSH <filename>\n"
1015 "PENDING <filename>\n"
1016 "FORGET <filename>\n"
1017 "UPDATE <filename> <values> [<values> ...]\n"
1022 char *help_flush[2] =
1026 "Usage: FLUSH <filename>\n"
1028 "Adds the given filename to the head of the update queue and returns\n"
1029 "after is has been dequeued.\n"
1032 char *help_flushall[2] =
1034 "Help for FLUSHALL\n"
1038 "Triggers writing of all pending updates. Returns immediately.\n"
1041 char *help_pending[2] =
1043 "Help for PENDING\n"
1045 "Usage: PENDING <filename>\n"
1047 "Shows any 'pending' updates for a file, in order.\n"
1048 "The updates shown have not yet been written to the underlying RRD file.\n"
1051 char *help_forget[2] =
1055 "Usage: FORGET <filename>\n"
1057 "Removes the file completely from the cache.\n"
1058 "Any pending updates for the file will be lost.\n"
1061 char *help_update[2] =
1065 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1067 "Adds the given file to the internal cache if it is not yet known and\n"
1068 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1071 "Each <values> has the following form:\n"
1072 " <values> = <time>:<value>[:<value>[...]]\n"
1073 "See the rrdupdate(1) manpage for details.\n"
1076 char *help_stats[2] =
1082 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083 "a description of the values.\n"
1086 char *help_batch[2] =
1090 "The 'BATCH' command permits the client to initiate a bulk load\n"
1091 " of commands to rrdcached.\n"
1096 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1097 " client: command #1\n"
1098 " client: command #2\n"
1099 " client: ... and so on\n"
1101 " server: 2 errors\n"
1102 " server: 7 message for command #7\n"
1103 " server: 9 message for command #9\n"
1105 "For more information, consult the rrdcached(1) documentation.\n"
1108 status = buffer_get_field (&buffer, &buffer_size, &command);
1110 help_text = help_help;
1113 if (strcasecmp (command, "update") == 0)
1114 help_text = help_update;
1115 else if (strcasecmp (command, "flush") == 0)
1116 help_text = help_flush;
1117 else if (strcasecmp (command, "flushall") == 0)
1118 help_text = help_flushall;
1119 else if (strcasecmp (command, "pending") == 0)
1120 help_text = help_pending;
1121 else if (strcasecmp (command, "forget") == 0)
1122 help_text = help_forget;
1123 else if (strcasecmp (command, "stats") == 0)
1124 help_text = help_stats;
1125 else if (strcasecmp (command, "batch") == 0)
1126 help_text = help_batch;
1128 help_text = help_help;
1131 add_response_info(sock, help_text[1]);
1132 return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1137 uint64_t copy_queue_length;
1138 uint64_t copy_updates_received;
1139 uint64_t copy_flush_received;
1140 uint64_t copy_updates_written;
1141 uint64_t copy_data_sets_written;
1142 uint64_t copy_journal_bytes;
1143 uint64_t copy_journal_rotate;
1145 uint64_t tree_nodes_number;
1146 uint64_t tree_depth;
1148 pthread_mutex_lock (&stats_lock);
1149 copy_queue_length = stats_queue_length;
1150 copy_updates_received = stats_updates_received;
1151 copy_flush_received = stats_flush_received;
1152 copy_updates_written = stats_updates_written;
1153 copy_data_sets_written = stats_data_sets_written;
1154 copy_journal_bytes = stats_journal_bytes;
1155 copy_journal_rotate = stats_journal_rotate;
1156 pthread_mutex_unlock (&stats_lock);
1158 pthread_mutex_lock (&cache_lock);
1159 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160 tree_depth = (uint64_t) g_tree_height (cache_tree);
1161 pthread_mutex_unlock (&cache_lock);
1163 add_response_info(sock,
1164 "QueueLength: %"PRIu64"\n", copy_queue_length);
1165 add_response_info(sock,
1166 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167 add_response_info(sock,
1168 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169 add_response_info(sock,
1170 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171 add_response_info(sock,
1172 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178 send_response(sock, RESP_OK, "Statistics follow\n");
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184 char *buffer, size_t buffer_size)
1189 status = buffer_get_field (&buffer, &buffer_size, &file);
1192 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1196 pthread_mutex_lock(&stats_lock);
1197 stats_flush_received++;
1198 pthread_mutex_unlock(&stats_lock);
1200 if (!check_file_access(file, sock)) return 0;
1202 status = flush_file (file);
1204 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205 else if (status == ENOENT)
1207 /* no file in our tree; see whether it exists at all */
1208 struct stat statbuf;
1210 memset(&statbuf, 0, sizeof(statbuf));
1211 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1214 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1216 else if (status < 0)
1217 return send_response(sock, RESP_ERR, "Internal error.\n");
1219 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1224 } /* }}} int handle_request_slurp */
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1230 status = has_privilege(sock, PRIV_HIGH);
1234 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236 pthread_mutex_lock(&cache_lock);
1237 flush_old_values(-1);
1238 pthread_mutex_unlock(&cache_lock);
1240 return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244 char *buffer, size_t buffer_size)
1250 status = buffer_get_field(&buffer, &buffer_size, &file);
1252 return send_response(sock, RESP_ERR,
1253 "Usage: PENDING <filename>\n");
1255 status = has_privilege(sock, PRIV_HIGH);
1259 pthread_mutex_lock(&cache_lock);
1260 ci = g_tree_lookup(cache_tree, file);
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267 for (int i=0; i < ci->values_num; i++)
1268 add_response_info(sock, "%s\n", ci->values[i]);
1270 pthread_mutex_unlock(&cache_lock);
1271 return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275 char *buffer, size_t buffer_size)
1280 status = buffer_get_field(&buffer, &buffer_size, &file);
1282 return send_response(sock, RESP_ERR,
1283 "Usage: FORGET <filename>\n");
1285 status = has_privilege(sock, PRIV_HIGH);
1289 if (!check_file_access(file, sock)) return 0;
1291 pthread_mutex_lock(&cache_lock);
1292 status = forget_file(file);
1293 pthread_mutex_unlock(&cache_lock);
1298 journal_write("forget", file);
1300 return send_response(sock, RESP_OK, "Gone!\n");
1303 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304 status < 0 ? "Internal error" : rrd_strerror(status));
1308 } /* }}} static int handle_request_forget */
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1311 char *buffer, size_t buffer_size)
1315 int bad_timestamps = 0;
1317 char orig_buf[CMD_MAX];
1324 status = has_privilege(sock, PRIV_HIGH);
1328 /* save it for the journal later */
1329 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1331 status = buffer_get_field (&buffer, &buffer_size, &file);
1333 return send_response(sock, RESP_ERR,
1334 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1336 pthread_mutex_lock(&stats_lock);
1337 stats_updates_received++;
1338 pthread_mutex_unlock(&stats_lock);
1340 if (!check_file_access(file, sock)) return 0;
1342 pthread_mutex_lock (&cache_lock);
1343 ci = g_tree_lookup (cache_tree, file);
1345 if (ci == NULL) /* {{{ */
1347 struct stat statbuf;
1349 /* don't hold the lock while we setup; stat(2) might block */
1350 pthread_mutex_unlock(&cache_lock);
1352 memset (&statbuf, 0, sizeof (statbuf));
1353 status = stat (file, &statbuf);
1356 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1359 if (status == ENOENT)
1360 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1362 return send_response(sock, RESP_ERR,
1363 "stat failed with error %i.\n", status);
1365 if (!S_ISREG (statbuf.st_mode))
1366 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368 if (access(file, R_OK|W_OK) != 0)
1369 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370 file, rrd_strerror(errno));
1372 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377 return send_response(sock, RESP_ERR, "malloc failed.\n");
1379 memset (ci, 0, sizeof (cache_item_t));
1381 ci->file = strdup (file);
1382 if (ci->file == NULL)
1385 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387 return send_response(sock, RESP_ERR, "strdup failed.\n");
1390 wipe_ci_values(ci, now);
1391 ci->flags = CI_FLAGS_IN_TREE;
1393 pthread_mutex_lock(&cache_lock);
1394 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1396 assert (ci != NULL);
1398 /* don't re-write updates in replay mode */
1400 journal_write("update", orig_buf);
1402 while (buffer_size > 0)
1409 status = buffer_get_field (&buffer, &buffer_size, &value);
1412 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1416 /* make sure update time is always moving forward */
1417 stamp = strtol(value, &eostamp, 10);
1418 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1421 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1424 else if (stamp <= ci->last_update_stamp)
1427 add_response_info(sock,
1428 "illegal attempt to update using time %ld when"
1429 " last update time is %ld (minimum one second step)\n",
1430 stamp, ci->last_update_stamp);
1434 ci->last_update_stamp = stamp;
1436 temp = (char **) realloc (ci->values,
1437 sizeof (char *) * (ci->values_num + 1));
1440 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1445 ci->values[ci->values_num] = strdup (value);
1446 if (ci->values[ci->values_num] == NULL)
1448 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1456 if (((now - ci->last_flush_time) >= config_write_interval)
1457 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458 && (ci->values_num > 0))
1460 enqueue_cache_item (ci, TAIL);
1463 pthread_mutex_unlock (&cache_lock);
1467 /* if we had only one update attempt, then return the full
1468 error message... try to get the most information out
1469 of the limited error space allowed by the protocol
1471 if (bad_timestamps == 1)
1472 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1474 return send_response(sock, RESP_ERR,
1475 "No values updated (%d bad timestamps).\n",
1479 return send_response(sock, RESP_OK,
1480 "errors, enqueued %i value(s).\n", values_num);
1485 } /* }}} int handle_request_update */
1487 /* we came across a "WROTE" entry during journal replay.
1488 * throw away any values that we have accumulated for this file
1490 static int handle_request_wrote (const char *buffer) /* {{{ */
1494 const char *file = buffer;
1496 pthread_mutex_lock(&cache_lock);
1498 ci = g_tree_lookup(cache_tree, file);
1501 pthread_mutex_unlock(&cache_lock);
1507 for (i=0; i < ci->values_num; i++)
1508 free(ci->values[i]);
1513 wipe_ci_values(ci, time(NULL));
1514 remove_from_queue(ci);
1516 pthread_mutex_unlock(&cache_lock);
1518 } /* }}} int handle_request_wrote */
1520 /* start "BATCH" processing */
1521 static int batch_start (listen_socket_t *sock) /* {{{ */
1524 if (sock->batch_mode)
1525 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1527 status = send_response(sock, RESP_OK,
1528 "Go ahead. End with dot '.' on its own line.\n");
1529 sock->batch_mode = 1;
1530 sock->batch_cmd = 0;
1533 } /* }}} static int batch_start */
1535 /* finish "BATCH" processing and return results to the client */
1536 static int batch_done (listen_socket_t *sock) /* {{{ */
1538 assert(sock->batch_mode);
1539 sock->batch_mode = 0;
1540 sock->batch_cmd = 0;
1541 return send_response(sock, RESP_OK, "errors\n");
1542 } /* }}} static int batch_done */
1544 /* if sock==NULL, we are in journal replay mode */
1545 static int handle_request (listen_socket_t *sock, /* {{{ */
1546 char *buffer, size_t buffer_size)
1552 assert (buffer[buffer_size - 1] == '\0');
1554 buffer_ptr = buffer;
1556 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1559 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1563 if (sock != NULL && sock->batch_mode)
1566 if (strcasecmp (command, "update") == 0)
1567 return (handle_request_update (sock, buffer_ptr, buffer_size));
1568 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1570 /* this is only valid in replay mode */
1571 return (handle_request_wrote (buffer_ptr));
1573 else if (strcasecmp (command, "flush") == 0)
1574 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1575 else if (strcasecmp (command, "flushall") == 0)
1576 return (handle_request_flushall(sock));
1577 else if (strcasecmp (command, "pending") == 0)
1578 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1579 else if (strcasecmp (command, "forget") == 0)
1580 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1581 else if (strcasecmp (command, "stats") == 0)
1582 return (handle_request_stats (sock));
1583 else if (strcasecmp (command, "help") == 0)
1584 return (handle_request_help (sock, buffer_ptr, buffer_size));
1585 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1586 return batch_start(sock);
1587 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1588 return batch_done(sock);
1590 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1594 } /* }}} int handle_request */
1596 /* MUST NOT hold journal_lock before calling this */
1597 static void journal_rotate(void) /* {{{ */
1599 FILE *old_fh = NULL;
1602 if (journal_cur == NULL || journal_old == NULL)
1605 pthread_mutex_lock(&journal_lock);
1607 /* we rotate this way (rename before close) so that the we can release
1608 * the journal lock as fast as possible. Journal writes to the new
1609 * journal can proceed immediately after the new file is opened. The
1610 * fclose can then block without affecting new updates.
1612 if (journal_fh != NULL)
1614 old_fh = journal_fh;
1616 rename(journal_cur, journal_old);
1617 ++stats_journal_rotate;
1620 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1621 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1624 journal_fh = fdopen(new_fd, "a");
1625 if (journal_fh == NULL)
1629 pthread_mutex_unlock(&journal_lock);
1634 if (journal_fh == NULL)
1637 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1638 journal_cur, rrd_strerror(errno));
1641 "JOURNALING DISABLED: All values will be flushed at shutdown");
1642 config_flush_at_shutdown = 1;
1645 } /* }}} static void journal_rotate */
1647 static void journal_done(void) /* {{{ */
1649 if (journal_cur == NULL)
1652 pthread_mutex_lock(&journal_lock);
1653 if (journal_fh != NULL)
1659 if (config_flush_at_shutdown)
1661 RRDD_LOG(LOG_INFO, "removing journals");
1662 unlink(journal_old);
1663 unlink(journal_cur);
1667 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1668 "journals will be used at next startup");
1671 pthread_mutex_unlock(&journal_lock);
1673 } /* }}} static void journal_done */
1675 static int journal_write(char *cmd, char *args) /* {{{ */
1679 if (journal_fh == NULL)
1682 pthread_mutex_lock(&journal_lock);
1683 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1684 pthread_mutex_unlock(&journal_lock);
1688 pthread_mutex_lock(&stats_lock);
1689 stats_journal_bytes += chars;
1690 pthread_mutex_unlock(&stats_lock);
1694 } /* }}} static int journal_write */
1696 static int journal_replay (const char *file) /* {{{ */
1702 char entry[CMD_MAX];
1704 if (file == NULL) return 0;
1709 struct stat statbuf;
1711 memset(&statbuf, 0, sizeof(statbuf));
1712 if (stat(file, &statbuf) != 0)
1714 if (errno == ENOENT)
1717 reason = "stat error";
1720 else if (!S_ISREG(statbuf.st_mode))
1722 reason = "not a regular file";
1725 if (statbuf.st_uid != daemon_uid)
1727 reason = "not owned by daemon user";
1730 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1732 reason = "must not be user/group writable";
1738 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739 file, rrd_strerror(status), reason);
1744 fh = fopen(file, "r");
1747 if (errno != ENOENT)
1748 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749 file, rrd_strerror(errno));
1753 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1760 if (fgets(entry, sizeof(entry), fh) == NULL)
1762 entry_len = strlen(entry);
1764 /* check \n termination in case journal writing crashed mid-line */
1767 else if (entry[entry_len - 1] != '\n')
1769 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1774 entry[entry_len - 1] = '\0';
1776 if (handle_request(NULL, entry, entry_len) == 0)
1784 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1785 entry_cnt, fail_cnt);
1787 return entry_cnt > 0 ? 1 : 0;
1788 } /* }}} static int journal_replay */
1790 static void journal_init(void) /* {{{ */
1792 int had_journal = 0;
1794 if (journal_cur == NULL) return;
1796 pthread_mutex_lock(&journal_lock);
1798 RRDD_LOG(LOG_INFO, "checking for journal files");
1800 had_journal += journal_replay(journal_old);
1801 had_journal += journal_replay(journal_cur);
1803 /* it must have been a crash. start a flush */
1804 if (had_journal && config_flush_at_shutdown)
1805 flush_old_values(-1);
1807 pthread_mutex_unlock(&journal_lock);
1810 RRDD_LOG(LOG_INFO, "journal processing complete");
1812 } /* }}} static void journal_init */
1814 static void close_connection(listen_socket_t *sock)
1816 close(sock->fd) ; sock->fd = -1;
1817 free(sock->rbuf); sock->rbuf = NULL;
1818 free(sock->wbuf); sock->wbuf = NULL;
1823 static void *connection_thread_main (void *args) /* {{{ */
1826 listen_socket_t *sock;
1830 sock = (listen_socket_t *) args;
1833 /* init read buffers */
1834 sock->next_read = sock->next_cmd = 0;
1835 sock->rbuf = malloc(RBUF_SIZE);
1836 if (sock->rbuf == NULL)
1838 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1839 close_connection(sock);
1843 pthread_mutex_lock (&connection_threads_lock);
1847 temp = (pthread_t *) realloc (connection_threads,
1848 sizeof (pthread_t) * (connection_threads_num + 1));
1851 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1855 connection_threads = temp;
1856 connection_threads[connection_threads_num] = pthread_self ();
1857 connection_threads_num++;
1860 pthread_mutex_unlock (&connection_threads_lock);
1862 while (do_shutdown == 0)
1868 struct pollfd pollfd;
1872 pollfd.events = POLLIN | POLLPRI;
1875 status = poll (&pollfd, 1, /* timeout = */ 500);
1878 else if (status == 0) /* timeout */
1880 else if (status < 0) /* error */
1883 if (status != EINTR)
1884 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1888 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1890 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1892 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1893 "poll(2) returned something unexpected: %#04hx",
1898 rbytes = read(fd, sock->rbuf + sock->next_read,
1899 RBUF_SIZE - sock->next_read);
1902 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1905 else if (rbytes == 0)
1908 sock->next_read += rbytes;
1910 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1912 status = handle_request (sock, cmd, cmd_len+1);
1919 close_connection(sock);
1921 self = pthread_self ();
1922 /* Remove this thread from the connection threads list */
1923 pthread_mutex_lock (&connection_threads_lock);
1924 /* Find out own index in the array */
1925 for (i = 0; i < connection_threads_num; i++)
1926 if (pthread_equal (connection_threads[i], self) != 0)
1928 assert (i < connection_threads_num);
1930 /* Move the trailing threads forward. */
1931 if (i < (connection_threads_num - 1))
1933 memmove (connection_threads + i,
1934 connection_threads + i + 1,
1935 sizeof (pthread_t) * (connection_threads_num - i - 1));
1938 connection_threads_num--;
1939 pthread_mutex_unlock (&connection_threads_lock);
1942 } /* }}} void *connection_thread_main */
1944 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1947 struct sockaddr_un sa;
1948 listen_socket_t *temp;
1953 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1954 path += strlen("unix:");
1956 temp = (listen_socket_t *) realloc (listen_fds,
1957 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1960 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1964 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1966 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1969 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1973 memset (&sa, 0, sizeof (sa));
1974 sa.sun_family = AF_UNIX;
1975 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1977 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1980 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1986 status = listen (fd, /* backlog = */ 10);
1989 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1995 listen_fds[listen_fds_num].fd = fd;
1996 listen_fds[listen_fds_num].family = PF_UNIX;
1997 strncpy(listen_fds[listen_fds_num].addr, path,
1998 sizeof (listen_fds[listen_fds_num].addr) - 1);
2002 } /* }}} int open_listen_socket_unix */
2004 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2006 struct addrinfo ai_hints;
2007 struct addrinfo *ai_res;
2008 struct addrinfo *ai_ptr;
2009 char addr_copy[NI_MAXHOST];
2014 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2015 addr_copy[sizeof (addr_copy) - 1] = 0;
2018 memset (&ai_hints, 0, sizeof (ai_hints));
2019 ai_hints.ai_flags = 0;
2020 #ifdef AI_ADDRCONFIG
2021 ai_hints.ai_flags |= AI_ADDRCONFIG;
2023 ai_hints.ai_family = AF_UNSPEC;
2024 ai_hints.ai_socktype = SOCK_STREAM;
2027 if (*addr == '[') /* IPv6+port format */
2029 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2032 port = strchr (addr, ']');
2035 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2044 else if (*port == 0)
2048 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2052 } /* if (*addr = ']') */
2053 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2055 port = rindex(addr, ':');
2063 status = getaddrinfo (addr,
2064 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2065 &ai_hints, &ai_res);
2068 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2069 "%s", addr, gai_strerror (status));
2073 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2076 listen_socket_t *temp;
2079 temp = (listen_socket_t *) realloc (listen_fds,
2080 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2083 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2087 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2089 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2092 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2096 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2098 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2101 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2106 status = listen (fd, /* backlog = */ 10);
2109 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2114 listen_fds[listen_fds_num].fd = fd;
2115 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2117 } /* for (ai_ptr) */
2120 } /* }}} static int open_listen_socket_network */
2122 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2124 assert(sock != NULL);
2125 assert(sock->addr != NULL);
2127 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2128 || sock->addr[0] == '/')
2129 return (open_listen_socket_unix(sock));
2131 return (open_listen_socket_network(sock));
2132 } /* }}} int open_listen_socket */
2134 static int close_listen_sockets (void) /* {{{ */
2138 for (i = 0; i < listen_fds_num; i++)
2140 close (listen_fds[i].fd);
2142 if (listen_fds[i].family == PF_UNIX)
2143 unlink(listen_fds[i].addr);
2151 } /* }}} int close_listen_sockets */
2153 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2155 struct pollfd *pollfds;
2160 for (i = 0; i < config_listen_address_list_len; i++)
2161 open_listen_socket (config_listen_address_list[i]);
2163 if (config_listen_address_list_len < 1)
2165 listen_socket_t sock;
2166 memset(&sock, 0, sizeof(sock));
2167 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2168 open_listen_socket (&sock);
2171 if (listen_fds_num < 1)
2173 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2174 "could be opened. Sorry.");
2178 pollfds_num = listen_fds_num;
2179 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2180 if (pollfds == NULL)
2182 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2185 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2187 RRDD_LOG(LOG_INFO, "listening for connections");
2189 while (do_shutdown == 0)
2191 assert (pollfds_num == ((int) listen_fds_num));
2192 for (i = 0; i < pollfds_num; i++)
2194 pollfds[i].fd = listen_fds[i].fd;
2195 pollfds[i].events = POLLIN | POLLPRI;
2196 pollfds[i].revents = 0;
2199 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2202 else if (status == 0) /* timeout */
2204 else if (status < 0) /* error */
2207 if (status != EINTR)
2209 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2214 for (i = 0; i < pollfds_num; i++)
2216 listen_socket_t *client_sock;
2217 struct sockaddr_storage client_sa;
2218 socklen_t client_sa_size;
2220 pthread_attr_t attr;
2222 if (pollfds[i].revents == 0)
2225 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2227 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2228 "poll(2) returned something unexpected for listen FD #%i.",
2233 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2234 if (client_sock == NULL)
2236 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2239 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2241 client_sa_size = sizeof (client_sa);
2242 client_sock->fd = accept (pollfds[i].fd,
2243 (struct sockaddr *) &client_sa, &client_sa_size);
2244 if (client_sock->fd < 0)
2246 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2251 pthread_attr_init (&attr);
2252 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2254 status = pthread_create (&tid, &attr, connection_thread_main,
2258 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2259 close_connection(client_sock);
2262 } /* for (pollfds_num) */
2263 } /* while (do_shutdown == 0) */
2265 RRDD_LOG(LOG_INFO, "starting shutdown");
2267 close_listen_sockets ();
2269 pthread_mutex_lock (&connection_threads_lock);
2270 while (connection_threads_num > 0)
2274 wait_for = connection_threads[0];
2276 pthread_mutex_unlock (&connection_threads_lock);
2277 pthread_join (wait_for, /* retval = */ NULL);
2278 pthread_mutex_lock (&connection_threads_lock);
2280 pthread_mutex_unlock (&connection_threads_lock);
2283 } /* }}} void *listen_thread_main */
2285 static int daemonize (void) /* {{{ */
2291 daemon_uid = geteuid();
2293 fd = open_pidfile();
2294 if (fd < 0) return fd;
2296 if (!stay_foreground)
2303 fprintf (stderr, "daemonize: fork(2) failed.\n");
2311 /* Become session leader */
2314 /* Open the first three file descriptors to /dev/null */
2319 open ("/dev/null", O_RDWR);
2322 } /* if (!stay_foreground) */
2324 /* Change into the /tmp directory. */
2325 base_dir = (config_base_dir != NULL)
2328 status = chdir (base_dir);
2331 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2335 install_signal_handlers();
2337 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2338 RRDD_LOG(LOG_INFO, "starting up");
2340 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2341 if (cache_tree == NULL)
2343 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2347 status = write_pidfile (fd);
2349 } /* }}} int daemonize */
2351 static int cleanup (void) /* {{{ */
2355 pthread_cond_signal (&cache_cond);
2356 pthread_join (queue_thread, /* return = */ NULL);
2360 RRDD_LOG(LOG_INFO, "goodbye");
2364 } /* }}} int cleanup */
2366 static int read_options (int argc, char **argv) /* {{{ */
2371 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2382 listen_socket_t **temp;
2383 listen_socket_t *new;
2385 new = malloc(sizeof(listen_socket_t));
2388 fprintf(stderr, "read_options: malloc failed.\n");
2391 memset(new, 0, sizeof(listen_socket_t));
2393 temp = (listen_socket_t **) realloc (config_listen_address_list,
2394 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2397 fprintf (stderr, "read_options: realloc failed.\n");
2400 config_listen_address_list = temp;
2402 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2403 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2405 temp[config_listen_address_list_len] = new;
2406 config_listen_address_list_len++;
2414 temp = atoi (optarg);
2416 config_flush_interval = temp;
2419 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2429 temp = atoi (optarg);
2431 config_write_interval = temp;
2434 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2444 temp = atoi(optarg);
2446 config_write_jitter = temp;
2449 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2457 config_write_base_only = 1;
2464 if (config_base_dir != NULL)
2465 free (config_base_dir);
2466 config_base_dir = strdup (optarg);
2467 if (config_base_dir == NULL)
2469 fprintf (stderr, "read_options: strdup failed.\n");
2473 len = strlen (config_base_dir);
2474 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2476 config_base_dir[len - 1] = 0;
2482 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2486 _config_base_dir_len = len;
2492 if (config_pid_file != NULL)
2493 free (config_pid_file);
2494 config_pid_file = strdup (optarg);
2495 if (config_pid_file == NULL)
2497 fprintf (stderr, "read_options: strdup failed.\n");
2504 config_flush_at_shutdown = 1;
2509 struct stat statbuf;
2510 const char *dir = optarg;
2512 status = stat(dir, &statbuf);
2515 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2519 if (!S_ISDIR(statbuf.st_mode)
2520 || access(dir, R_OK|W_OK|X_OK) != 0)
2522 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2523 errno ? rrd_strerror(errno) : "");
2527 journal_cur = malloc(PATH_MAX + 1);
2528 journal_old = malloc(PATH_MAX + 1);
2529 if (journal_cur == NULL || journal_old == NULL)
2531 fprintf(stderr, "malloc failure for journal files\n");
2536 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2537 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2544 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2546 "Usage: rrdcached [options]\n"
2548 "Valid options are:\n"
2549 " -l <address> Socket address to listen to.\n"
2550 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2551 " -w <seconds> Interval in which to write data.\n"
2552 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2553 " -f <seconds> Interval in which to flush dead data.\n"
2554 " -p <file> Location of the PID-file.\n"
2555 " -b <dir> Base directory to change to.\n"
2556 " -B Restrict file access to paths within -b <dir>\n"
2557 " -g Do not fork and run in the foreground.\n"
2558 " -j <dir> Directory in which to create the journal files.\n"
2559 " -F Always flush all updates at shutdown\n"
2561 "For more information and a detailed description of all options "
2563 "to the rrdcached(1) manual page.\n",
2567 } /* switch (option) */
2568 } /* while (getopt) */
2570 /* advise the user when values are not sane */
2571 if (config_flush_interval < 2 * config_write_interval)
2572 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2573 " 2x write interval (-w) !\n");
2574 if (config_write_jitter > config_write_interval)
2575 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2576 " write interval (-w) !\n");
2578 if (config_write_base_only && config_base_dir == NULL)
2579 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2580 " Consult the rrdcached documentation\n");
2582 if (journal_cur == NULL)
2583 config_flush_at_shutdown = 1;
2586 } /* }}} int read_options */
2588 int main (int argc, char **argv)
2592 status = read_options (argc, argv);
2600 status = daemonize ();
2603 struct sigaction sigchld;
2605 memset (&sigchld, 0, sizeof (sigchld));
2606 sigchld.sa_handler = SIG_IGN;
2607 sigaction (SIGCHLD, &sigchld, NULL);
2611 else if (status != 0)
2613 fprintf (stderr, "daemonize failed, exiting.\n");
2619 /* start the queue thread */
2620 memset (&queue_thread, 0, sizeof (queue_thread));
2621 status = pthread_create (&queue_thread,
2627 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2632 listen_thread_main (NULL);
2639 * vim: set sw=2 sts=2 ts=8 et fdm=marker :