From: Kevin Brintnall Date: Sun, 14 Sep 2008 07:26:58 +0000 (+0200) Subject: Big bunch of improvements for the caching daemon. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=refs%2Fheads%2Fkb%2Frrdd;p=rrdtool.git Big bunch of improvements for the caching daemon. Tobi et al., I have made some enhancements to Florian's rrdcached patch. This patch (against /trunk) can be found at the following URL. Testing has shown it to be production ready. http://dist.rufus.net/~kbrint/rrdcached/rrdcache.patch http://dist.rufus.net/~kbrint/rrdcached/rrdcache.patch.asc It applies cleanly to 1.3.2 as well (with obligatory automake;autoconf). The enhancements over Florian's version are in the postscript. -- kevin brintnall =~ /kbrint@rufus.net/ ================================================================= fixed: removed extra compiler pragmas that caused portability problems added: more server logging (startup, shutdown, ...) added: client maintains a cached connection across multiple commands. correctly handles a long-running process where some commands are specified with --daemon and others are specified without fixed: consolidated duplicate code for finding daemon addr (from --daemon or environment). fixed: update with --template and daemon specified by environment was not treated properly in rrd_update.c:rrd_update(line ~442) fixed: memory leak with ci->values not being freed added: added -z to spread write load fixed: check that file is writable before accepting UPDATE fixed: queue_thread_main: avoid tight-spin when we're scheduling the next flush. old code called flush_old_values(-1) once per update for up to a full second. ("<" vs "<=" comparison) added: journal support with recovery fixed: signal handlers wake up queue_thread for timely shutdown fixed: refuse to start if pid file present at startup added: more stats fixed: "flush file" should not error if file exists on the file system, but not in update cache. the file may have been flushed due to inactivity, or the daemon may have just started up. if the file exists, return success. still fails if files does not exist at all on disk. fixed: ENODATA is not portable --- diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod index fd6bc6c..e762659 100644 --- a/doc/rrdcached.pod +++ b/doc/rrdcached.pod @@ -6,7 +6,7 @@ rrdcached - Data caching daemon for rrdtool =head1 SYNOPSIS -B [B<-l> I
] [B<-w> I] [B<-f> I] +B [B<-l> I
] [B<-w> I] [B<-z> I] [B<-f> I] [B<-j> I] =head1 DESCRIPTION @@ -42,6 +42,13 @@ C, will be used. Data is written to disk every I seconds. If this option is not specified the default interval of 300Eseconds will be used. +=item B<-z> I + +If specified, rrdcached will delay writing of each RRD for a random number +of seconds in the rangeE[0,I). This will avoid too many +writes being queued simultaneously. This value should be no greater than +the value specified in B<-w>. By default, there is no delay. + =item B<-f> I Every I seconds the entire cache is searched for old values which are @@ -54,6 +61,19 @@ cases. This timeout defaults to 3600Eseconds. Sets the name and location of the PID-file. If not specified, the default, C/run/rrdcached.pid> will be used. +=item B<-j> I + +Write updates to a journal in I. In the event of a program or system +crash, this will allow the daemon to write any updates that were pending +at the time of the crash. + +On startup, the daemon will check for journal files in this directory. If +found, all updates therein will be read into memory before the daemon +starts accepting new connections. + +The journal will be rotated with the same frequency as the flush timer +given by B<-f>. On clean shutdown, the journal files are removed. + =item B<-b> I The daemon will change into a specific directory at startup. All files passed @@ -89,6 +109,8 @@ line argument B<--daemon> or the environment variable B: =item B +=item B + =item B =item B @@ -291,12 +313,16 @@ name of the value, a colon, one or more spaces and the actual value. Example: - 5 Statistics follow + 9 Statistics follow QueueLength: 0 + UpdatesReceived: 30 + FlushesReceived: 2 UpdatesWritten: 13 DataSetsWritten: 390 TreeNodesNumber: 13 TreeDepth: 4 + JournalBytes: 190 + JournalRotate: 0 =item B I I [I ...] @@ -304,6 +330,13 @@ Adds more data to a filename. This is B operation the daemon was designed for, so describing the mechanism again is unnecessary. Read L above for a detailed explanation. +=item B I + +This command is written to the journal after a file is successfully +written out to disk. It is used during journal replay to determine which +updates have already been applied. It is I valid in the journal; it +is not accepted from the other command channels. + =back =head2 Performance Values @@ -316,18 +349,18 @@ The following counters are returned by the B command: Number of nodes currently enqueued in the update queue. -=item B I<(unsigned 64bit integer)> +=item B I<(unsigned 64bit integer)> -Depth of the tree used for fast key lookup. +Number of UPDATE commands received. -=item B I<(unsigned 64bit integer)> +=item B I<(unsigned 64bit integer)> -Number of nodes in the cache. +Number of FLUSH commands received. =item B I<(unsigned 64bit integer)> -Total number of updates, i.Ee. calls to C, since the daemon -was started. +Total number of updates, i.Ee. calls to C, since the +daemon was started. =item B I<(unsigned 64bit integer)> @@ -336,6 +369,22 @@ data set is one or more values passed to the B command. For example: C is one data set with two values. The term "data set" is used to prevent confusion whether individual values or groups of values are counted. +=item B I<(unsigned 64bit integer)> + +Number of nodes in the cache. + +=item B I<(unsigned 64bit integer)> + +Depth of the tree used for fast key lookup. + +=item B I<(unsigned 64bit integer)> + +Total number of bytes written to the journal since startup. + +=item B I<(unsigned 64bit integer)> + +Number of times the journal has been rotated since startup. + =back =head1 BUGS @@ -346,7 +395,14 @@ No known bugs at the moment. L, L -=head1 AUHOR +=head1 AUTHOR B and this manual page have been written by Florian Forster EoctoEatEverplant.orgE. + +=head1 CONTRIBUTORS + +kevin brintnall Ekbrint@rufus.netE + +=cut + diff --git a/src/librrd.sym.in b/src/librrd.sym.in index 22afc86..ae5da6f 100644 --- a/src/librrd.sym.in +++ b/src/librrd.sym.in @@ -51,7 +51,10 @@ rrd_version rrd_write rrd_xport rrdc_connect +rrdc_is_connected rrdc_disconnect rrdc_flush +rrdc_stats_free +rrdc_stats_get rrdc_update @RRD_GETOPT_LONG@ diff --git a/src/rrd_client.c b/src/rrd_client.c index f1253f8..d1ad5e0 100644 --- a/src/rrd_client.c +++ b/src/rrd_client.c @@ -21,6 +21,7 @@ #include "rrd.h" #include "rrd_client.h" +#include "rrd_tool.h" #include #include @@ -32,8 +33,23 @@ #include #include +#ifndef ENODATA +#define ENODATA ENOENT +#endif + +struct rrdc_response_s +{ + int status; + char *message; + char **lines; + size_t lines_num; +}; +typedef struct rrdc_response_s rrdc_response_t; + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static int sd = -1; +static char *sd_path = NULL; /* cache the path for sd */ +static void _disconnect(void); static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ { @@ -57,16 +73,15 @@ static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ if (status == 0) { - close (sd); - sd = -1; + _disconnect(); errno = EPROTO; return (-1); } assert ((0 > status) || (buffer_free >= (size_t) status)); - buffer_free = buffer_free - status; - buffer_used = buffer_used + status; + buffer_free -= status; + buffer_used += status; if (buffer[buffer_used - 1] == '\n') break; @@ -78,7 +93,7 @@ static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ return (-1); } - buffer[buffer_used - 1] = 0; + buffer[buffer_used - 1] = '\0'; return (buffer_used); } /* }}} ssize_t sread */ @@ -100,13 +115,13 @@ static ssize_t swrite (const void *buf, size_t count) /* {{{ */ if (status < 0) { - close (sd); - sd = -1; + _disconnect(); + rrd_set_error("lost connection to rrdcached"); return (status); } - nleft = nleft - status; - ptr = ptr + status; + nleft -= status; + ptr += status; } return (0); @@ -177,26 +192,143 @@ static int buffer_add_value (const char *value, /* {{{ */ return (buffer_add_string (temp, buffer_ret, buffer_size_ret)); } /* }}} int buffer_add_value */ -static int rrdc_connect_unix (const char *path) /* {{{ */ +static int response_parse (char *buffer, size_t buffer_size, /* {{{ */ + rrdc_response_t **ret_response) { - struct sockaddr_un sa; - int status; + rrdc_response_t *ret; - assert (path != NULL); + char *dummy; + char *saveptr; - pthread_mutex_lock (&lock); + char *line_ptr; + size_t line_counter; - if (sd >= 0) + if (buffer == NULL) + return (EINVAL); + if (buffer_size <= 0) + return (EINVAL); + + if (buffer[buffer_size - 1] != 0) + return (-1); + + ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t)); + if (ret == NULL) + return (ENOMEM); + memset (ret, 0, sizeof (*ret)); + + line_counter = 0; + + dummy = buffer; + saveptr = NULL; + while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL) { - pthread_mutex_unlock (&lock); - return (0); + dummy = NULL; + + if (ret->message == NULL) + { + ret->status = strtol (buffer, &ret->message, 0); + if (buffer == ret->message) + { + free (ret); + return (EPROTO); + } + + /* Skip leading whitespace of the status message */ + ret->message += strspn (ret->message, " \t"); + + if (ret->status > 0) + { + ret->lines = (char **) malloc (sizeof (char *) * ret->status); + if (ret->lines == NULL) + { + free (ret); + return (ENOMEM); + } + memset (ret->lines, 0, sizeof (char *) * ret->status); + ret->lines_num = (size_t) ret->status; + } + else + { + ret->lines = NULL; + ret->lines_num = 0; + } + } + else /* if (ret->message != NULL) */ + { + if (line_counter < ret->lines_num) + ret->lines[line_counter] = line_ptr; + line_counter++; + } + } /* while (strtok_r) */ + + if (ret->lines_num != line_counter) + { + errno = EPROTO; + if (ret->lines != NULL) + free (ret->lines); + free (ret); + return (-1); + } + + *ret_response = ret; + return (0); +} /* }}} int response_parse */ + +static void response_free (rrdc_response_t *res) /* {{{ */ +{ + if (res == NULL) + return; + + if (res->lines != NULL) + { + res->lines_num = 0; + free (res->lines); + res->lines = NULL; } + free (res); +} /* }}} void response_free */ + + +/* determine whether we are connected to the specified daemon_addr if + * NULL, return whether we are connected at all + */ +int rrdc_is_connected(const char *daemon_addr) /* {{{ */ +{ + if (sd < 0) + return 0; + else if (daemon_addr == NULL) + { + /* here we have to handle the case i.e. + * UPDATE --daemon ...; UPDATEV (no --daemon) ... + * In other words: we have a cached connection, + * but it is not specified in the current command. + * Daemon is only implied in this case if set in ENV + */ + if (getenv(ENV_RRDCACHED_ADDRESS) != NULL) + return 1; + else + return 0; + } + else if (strcmp(daemon_addr, sd_path) == 0) + return 1; + else + return 0; + +} /* }}} int rrdc_is_connected */ + +static int rrdc_connect_unix (const char *path) /* {{{ */ +{ + struct sockaddr_un sa; + int status; + + assert (path != NULL); + assert (sd == -1); + sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); if (sd < 0) { status = errno; - pthread_mutex_unlock (&lock); return (status); } @@ -208,38 +340,22 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ if (status != 0) { status = errno; - pthread_mutex_unlock (&lock); return (status); } - pthread_mutex_unlock (&lock); - return (0); } /* }}} int rrdc_connect_unix */ -int rrdc_connect (const char *addr) /* {{{ */ +static int rrdc_connect_network (const char *addr) /* {{{ */ { struct addrinfo ai_hints; struct addrinfo *ai_res; struct addrinfo *ai_ptr; - int status; - - if (addr == NULL) - addr = RRDCACHED_DEFAULT_ADDRESS; - if (strncmp ("unix:", addr, strlen ("unix:")) == 0) - return (rrdc_connect_unix (addr + strlen ("unix:"))); - else if (addr[0] == '/') - return (rrdc_connect_unix (addr)); - - pthread_mutex_lock (&lock); - - if (sd >= 0) - { - pthread_mutex_unlock (&lock); - return (0); - } + assert (addr != NULL); + assert (sd == -1); + int status; memset (&ai_hints, 0, sizeof (ai_hints)); ai_hints.ai_flags = 0; #ifdef AI_ADDRCONFIG @@ -251,10 +367,7 @@ int rrdc_connect (const char *addr) /* {{{ */ ai_res = NULL; status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); if (status != 0) - { - pthread_mutex_unlock (&lock); return (status); - } for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { @@ -270,31 +383,76 @@ int rrdc_connect (const char *addr) /* {{{ */ if (status != 0) { status = errno; - close (sd); - sd = -1; + _disconnect(); continue; } assert (status == 0); break; } /* for (ai_ptr) */ - pthread_mutex_unlock (&lock); return (status); -} /* }}} int rrdc_connect */ +} /* }}} int rrdc_connect_network */ -int rrdc_disconnect (void) /* {{{ */ +int rrdc_connect (const char *addr) /* {{{ */ { - pthread_mutex_lock (&lock); + int status = 0; - if (sd < 0) + if (addr == NULL) + addr = getenv (ENV_RRDCACHED_ADDRESS); + + if (addr == NULL) + return 0; + + pthread_mutex_lock(&lock); + + if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0) { + /* connection to the same daemon; use cached connection */ pthread_mutex_unlock (&lock); return (0); } + else + { + _disconnect(); + } + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + status = rrdc_connect_unix (addr + strlen ("unix:")); + else if (addr[0] == '/') + status = rrdc_connect_unix (addr); + else + status = rrdc_connect_network(addr); + + if (status == 0 && sd >= 0) + sd_path = strdup(addr); + else + rrd_set_error("Unable to connect to rrdcached: %s", + (status < 0) + ? "Internal error" + : rrd_strerror (status)); + + pthread_mutex_unlock (&lock); + return (status); +} /* }}} int rrdc_connect */ + +static void _disconnect(void) /* {{{ */ +{ + if (sd >= 0) + close(sd); + + if (sd_path != NULL) + free(sd_path); - close (sd); sd = -1; + sd_path = NULL; +} /* }}} static void _disconnect(void) */ + +int rrdc_disconnect (void) /* {{{ */ +{ + pthread_mutex_lock (&lock); + + _disconnect(); pthread_mutex_unlock (&lock); @@ -431,6 +589,204 @@ int rrdc_flush (const char *filename) /* {{{ */ return (status); } /* }}} int rrdc_flush */ + + +/* convenience function; if there is a daemon specified, or if we can + * detect one from the environment, then flush the file. Otherwise, no-op + */ +int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */ +{ + int status = 0; + + rrdc_connect(opt_daemon); + + if (rrdc_is_connected(opt_daemon)) + { + status = rrdc_flush (filename); + if (status != 0) + { + rrd_set_error ("rrdc_flush (%s) failed with status %i.", + filename, status); + } + } /* if (daemon_addr) */ + + return status; +} /* }}} int rrdc_flush_if_daemon */ + + +int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ +{ + rrdc_stats_t *head; + rrdc_stats_t *tail; + + rrdc_response_t *response; + + char buffer[4096]; + size_t buffer_size; + int status; + size_t i; + + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (ENOTCONN); + } + + /* Protocol example: {{{ + * -> STATS + * <- 5 Statistics follow + * <- QueueLength: 0 + * <- UpdatesWritten: 0 + * <- DataSetsWritten: 0 + * <- TreeNodesNumber: 0 + * <- TreeDepth: 0 + * }}} */ + status = swrite ("STATS\n", strlen ("STATS\n")); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + status = sread (buffer, sizeof (buffer)); + if (status < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + else if (status == 0) + { + pthread_mutex_unlock (&lock); + return (ENODATA); + } + + pthread_mutex_unlock (&lock); + + /* Assert NULL termination */ + buffer_size = (size_t) status; + if (buffer[buffer_size - 1] != 0) + { + if (buffer_size < sizeof (buffer)) + { + buffer[buffer_size] = 0; + buffer_size++; + } + else + { + return (ENOBUFS); + } + } + + status = response_parse (buffer, buffer_size, &response); + if (status != 0) + return (status); + + if (response->status <= 0) + { + response_free (response); + return (EIO); + } + + head = NULL; + tail = NULL; + for (i = 0; i < response->lines_num; i++) + { + char *key; + char *value; + char *endptr; + rrdc_stats_t *s; + + key = response->lines[i]; + value = strchr (key, ':'); + if (value == NULL) + continue; + *value = 0; + value++; + + while ((value[0] == ' ') || (value[0] == '\t')) + value++; + + s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t)); + if (s == NULL) + continue; + memset (s, 0, sizeof (*s)); + + s->name = strdup (key); + + endptr = NULL; + if ((strcmp ("QueueLength", key) == 0) + || (strcmp ("TreeNodesNumber", key) == 0) + || (strcmp ("TreeDepth", key) == 0)) + { + s->type = RRDC_STATS_TYPE_GAUGE; + s->value.gauge = strtod (value, &endptr); + } + else if ((strcmp ("UpdatesWritten", key) == 0) + || (strcmp ("DataSetsWritten", key) == 0)) + { + s->type = RRDC_STATS_TYPE_COUNTER; + s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0); + } + else + { + free (s); + continue; + } + + /* Conversion failed */ + if (endptr == value) + { + free (s); + continue; + } + + if (head == NULL) + { + head = s; + tail = s; + s->next = NULL; + } + else + { + tail->next = s; + tail = s; + } + } /* for (i = 0; i < response->lines_num; i++) */ + + response_free (response); + + if (head == NULL) + return (EPROTO); + + *ret_stats = head; + return (0); +} /* }}} int rrdc_stats_get */ + +void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */ +{ + rrdc_stats_t *this; + + this = ret_stats; + while (this != NULL) + { + rrdc_stats_t *next; + + next = this->next; + + if (this->name != NULL) + { + free (this->name); + this->name = NULL; + } + free (this); + + this = next; + } /* while (this != NULL) */ +} /* }}} void rrdc_stats_free */ + /* * vim: set sw=2 sts=2 ts=8 et fdm=marker : */ diff --git a/src/rrd_client.h b/src/rrd_client.h index 92d4c07..c646cb8 100644 --- a/src/rrd_client.h +++ b/src/rrd_client.h @@ -22,6 +22,8 @@ #ifndef __RRD_CLIENT_H #define __RRD_CLIENT_H 1 +#include + #ifndef RRDCACHED_DEFAULT_ADDRESS # define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock" #endif @@ -30,6 +32,7 @@ #define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS" int rrdc_connect (const char *addr); +int rrdc_is_connected(const char *daemon_addr); int rrdc_disconnect (void); int rrdc_update (const char *filename, int values_num, @@ -37,4 +40,26 @@ int rrdc_update (const char *filename, int values_num, int rrdc_flush (const char *filename); +struct rrdc_stats_s +{ + const char *name; + uint16_t type; +#define RRDC_STATS_TYPE_GAUGE 0x0001 +#define RRDC_STATS_TYPE_COUNTER 0x0002 + uint16_t flags; + union + { + uint64_t counter; + double gauge; + } value; + struct rrdc_stats_s *next; +}; +typedef struct rrdc_stats_s rrdc_stats_t; + +int rrdc_stats_get (rrdc_stats_t **ret_stats); +void rrdc_stats_free (rrdc_stats_t *ret_stats); + #endif /* __RRD_CLIENT_H */ +/* + * vim: set sw=2 sts=2 ts=8 et fdm=marker : + */ diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index bc299f8..a5e8e55 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -17,31 +17,9 @@ * * Authors: * Florian octo Forster + * kevin brintnall **/ -/* - * First tell the compiler to stick to the C99 and POSIX standards as close as - * possible. - */ -#ifndef __STRICT_ANSI__ /* {{{ */ -# define __STRICT_ANSI__ -#endif - -#ifndef _ISOC99_SOURCE -# define _ISOC99_SOURCE -#endif - -#ifdef _POSIX_C_SOURCE -# undef _POSIX_C_SOURCE -#endif -#define _POSIX_C_SOURCE 200112L - -/* Single UNIX needed for strdup. */ -#ifdef _XOPEN_SOURCE -# undef _XOPEN_SOURCE -#endif -#define _XOPEN_SOURCE 500 - #ifndef _REENTRANT # define _REENTRANT #endif @@ -50,9 +28,6 @@ # define _THREAD_SAFE #endif -#ifdef _GNU_SOURCE -# undef _GNU_SOURCE -#endif /* }}} */ /* @@ -112,8 +87,8 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; -#define CI_FLAGS_IN_TREE 0x01 -#define CI_FLAGS_IN_QUEUE 0x02 +#define CI_FLAGS_IN_TREE (1<<0) +#define CI_FLAGS_IN_QUEUE (1<<1) int flags; cache_item_t *next; @@ -135,9 +110,14 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* max length of socket command or response */ +#define CMD_MAX 4096 + /* * Variables */ +static int stay_foreground = 0; + static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; @@ -145,9 +125,9 @@ static int do_shutdown = 0; static pthread_t queue_thread; -static pthread_t *connetion_threads = NULL; -static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; -static int connetion_threads_num = 0; +static pthread_t *connection_threads = NULL; +static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connection_threads_num = 0; /* Cache stuff */ static GTree *cache_tree = NULL; @@ -159,6 +139,7 @@ static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; +static int config_write_jitter = 0; static int config_flush_interval = 3600; static char *config_pid_file = NULL; static char *config_base_dir = NULL; @@ -167,27 +148,45 @@ static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; +static uint64_t stats_updates_received = 0; +static uint64_t stats_flush_received = 0; static uint64_t stats_updates_written = 0; static uint64_t stats_data_sets_written = 0; +static uint64_t stats_journal_bytes = 0; +static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; +/* Journaled updates */ +static char *journal_cur = NULL; +static char *journal_old = NULL; +static FILE *journal_fh = NULL; +static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; +static int journal_write(char *cmd, char *args); +static void journal_done(void); +static void journal_rotate(void); + /* * Functions */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGINT"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_int_handler */ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGTERM"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_term_handler */ static int write_pidfile (void) /* {{{ */ { pid_t pid; char *file; + int fd; FILE *fh; pid = getpid (); @@ -196,10 +195,19 @@ static int write_pidfile (void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fh = fopen (file, "w"); + fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + if (fd < 0) + { + RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", + file, rrd_strerror(errno)); + return (-1); + } + + fh = fdopen (fd, "w"); if (fh == NULL) { RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + close(fd); return (-1); } @@ -282,6 +290,9 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ size_t nleft; ssize_t status; + /* special case for journal replay */ + if (fd < 0) return 0; + ptr = (const char *) buf; nleft = count; @@ -295,13 +306,25 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ if (status < 0) return (status); - nleft = nleft - status; - ptr = ptr + status; + nleft -= status; + ptr += status; } return (0); } /* }}} ssize_t swrite */ +static void _wipe_ci_values(cache_item_t *ci, time_t when) +{ + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = when; + if (config_write_jitter > 0) + ci->last_flush_time += (random() % config_write_jitter); + + ci->flags &= ~(CI_FLAGS_IN_QUEUE); +} + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -517,8 +540,14 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ flush_old_values (config_write_interval); /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec < now.tv_sec) + while (next_flush.tv_sec <= now.tv_sec) next_flush.tv_sec += config_flush_interval; + + /* unlock the cache while we rotate so we don't block incoming + * updates if the fsync() blocks on disk I/O */ + pthread_mutex_unlock(&cache_lock); + journal_rotate(); + pthread_mutex_lock(&cache_lock); } /* Now, check if there's something to store away. If not, wait until @@ -552,14 +581,13 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } + assert(ci->values != NULL); + assert(ci->values_num > 0); + values = ci->values; values_num = ci->values_num; - ci->values = NULL; - ci->values_num = 0; - - ci->last_flush_time = time (NULL); - ci->flags &= ~(CI_FLAGS_IN_QUEUE); + _wipe_ci_values(ci, time(NULL)); cache_queue_head = ci->next; if (cache_queue_head == NULL) @@ -573,18 +601,23 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); + rrd_clear_error (); status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { - RRDD_LOG (LOG_ERR, "queue_thread_main: " - "rrd_update_r failed with status %i.", - status); + RRDD_LOG (LOG_NOTICE, "queue_thread_main: " + "rrd_update_r (%s) failed with status %i. (%s)", + file, status, rrd_get_error()); } - free (file); + journal_write("wrote", file); + for (i = 0; i < values_num; i++) free (values[i]); + free(values); + free(file); + if (status == 0) { pthread_mutex_lock (&stats_lock); @@ -602,6 +635,10 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ pthread_mutex_unlock (&cache_lock); + assert(cache_queue_head == NULL); + RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); + journal_done(); + return (NULL); } /* }}} void *queue_thread_main */ @@ -625,13 +662,13 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ return (-1); /* This is ensured by `handle_request'. */ - assert (buffer[buffer_size - 1] == ' '); + assert (buffer[buffer_size - 1] == '\0'); status = -1; while (buffer_pos < buffer_size) { /* Check for end-of-field or end-of-buffer */ - if (buffer[buffer_pos] == ' ') + if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') { field[field_size] = 0; field_size++; @@ -808,19 +845,27 @@ static int handle_request_stats (int fd, /* {{{ */ size_t buffer_size __attribute__((unused))) { int status; - char outbuf[4096]; + char outbuf[CMD_MAX]; uint64_t copy_queue_length; + uint64_t copy_updates_received; + uint64_t copy_flush_received; uint64_t copy_updates_written; uint64_t copy_data_sets_written; + uint64_t copy_journal_bytes; + uint64_t copy_journal_rotate; uint64_t tree_nodes_number; uint64_t tree_depth; pthread_mutex_lock (&stats_lock); copy_queue_length = stats_queue_length; + copy_updates_received = stats_updates_received; + copy_flush_received = stats_flush_received; copy_updates_written = stats_updates_written; copy_data_sets_written = stats_data_sets_written; + copy_journal_bytes = stats_journal_bytes; + copy_journal_rotate = stats_journal_rotate; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); @@ -838,7 +883,7 @@ static int handle_request_stats (int fd, /* {{{ */ return (status); \ } - strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf)); + strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf)); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), @@ -846,6 +891,14 @@ static int handle_request_stats (int fd, /* {{{ */ RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), + "UpdatesReceived: %"PRIu64"\n", copy_updates_received); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), + "FlushesReceived: %"PRIu64"\n", copy_flush_received); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), "UpdatesWritten: %"PRIu64"\n", copy_updates_written); RRDD_STATS_SEND; @@ -861,6 +914,14 @@ static int handle_request_stats (int fd, /* {{{ */ "TreeDepth: %"PRIu64"\n", tree_depth); RRDD_STATS_SEND; + snprintf (outbuf, sizeof(outbuf), + "JournalBytes: %"PRIu64"\n", copy_journal_bytes); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof(outbuf), + "JournalRotate: %"PRIu64"\n", copy_journal_rotate); + RRDD_STATS_SEND; + return (0); #undef RRDD_STATS_SEND } /* }}} int handle_request_stats */ @@ -870,7 +931,7 @@ static int handle_request_flush (int fd, /* {{{ */ { char *file; int status; - char result[4096]; + char result[CMD_MAX]; status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -879,11 +940,24 @@ static int handle_request_flush (int fd, /* {{{ */ } else { + pthread_mutex_lock(&stats_lock); + stats_flush_received++; + pthread_mutex_unlock(&stats_lock); + status = flush_file (file); if (status == 0) snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file); else if (status == ENOENT) - snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + { + /* no file in our tree; see whether it exists at all */ + struct stat statbuf; + + memset(&statbuf, 0, sizeof(statbuf)); + if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode)) + snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file); + else + snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + } else if (status < 0) strncpy (result, "-1 Internal error.\n", sizeof (result)); else @@ -912,7 +986,7 @@ static int handle_request_update (int fd, /* {{{ */ time_t now; cache_item_t *ci; - char answer[4096]; + char answer[CMD_MAX]; #define RRDD_UPDATE_SEND \ answer[sizeof (answer) - 1] = 0; \ @@ -935,6 +1009,10 @@ static int handle_request_update (int fd, /* {{{ */ return (0); } + pthread_mutex_lock(&stats_lock); + stats_updates_received++; + pthread_mutex_unlock(&stats_lock); + pthread_mutex_lock (&cache_lock); ci = g_tree_lookup (cache_tree, file); @@ -947,11 +1025,11 @@ static int handle_request_update (int fd, /* {{{ */ if (status != 0) { pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_ERR, "handle_request_update: stat (%s) failed.", file); + RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); status = errno; if (status == ENOENT) - snprintf (answer, sizeof (answer), "-1 No such file: %s", file); + snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file); else snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n", status); @@ -962,7 +1040,16 @@ static int handle_request_update (int fd, /* {{{ */ { pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "-1 Not a regular file: %s", file); + snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file); + RRDD_UPDATE_SEND; + return (0); + } + if (access(file, R_OK|W_OK) != 0) + { + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n", + file, rrd_strerror(errno)); RRDD_UPDATE_SEND; return (0); } @@ -991,9 +1078,7 @@ static int handle_request_update (int fd, /* {{{ */ return (0); } - ci->values = NULL; - ci->values_num = 0; - ci->last_flush_time = now; + _wipe_ci_values(ci, now); ci->flags = CI_FLAGS_IN_TREE; g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); @@ -1056,31 +1141,46 @@ static int handle_request_update (int fd, /* {{{ */ #undef RRDD_UPDATE_SEND } /* }}} int handle_request_update */ -static int handle_request (int fd) /* {{{ */ +/* we came across a "WROTE" entry during journal replay. + * throw away any values that we have accumulated for this file + */ +static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */ + const char *buffer, + size_t buffer_size __attribute__((unused))) { - char buffer[4096]; - size_t buffer_size; - char *buffer_ptr; - char *command; - int status; + int i; + cache_item_t *ci; + const char *file = buffer; - status = (int) sread (fd, buffer, sizeof (buffer)); - if (status == 0) - { - return (1); - } - else if (status < 0) + pthread_mutex_lock(&cache_lock); + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + goto out; + + if (ci->values) { - RRDD_LOG (LOG_ERR, "handle_request: sread failed."); - return (-1); + for (i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free(ci->values); } - buffer_size = (size_t) status; - assert (buffer_size <= sizeof (buffer)); - assert (buffer[buffer_size - 1] == 0); - /* Place the normal field separator at the end to simplify - * `buffer_get_field's work. */ - buffer[buffer_size - 1] = ' '; + _wipe_ci_values(ci, time(NULL)); + +out: + pthread_mutex_unlock(&cache_lock); + return 0; +} + +/* if fd < 0, we are in journal replay mode */ +static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */ +{ + char *buffer_ptr; + char *command; + int status; + + assert (buffer[buffer_size - 1] == '\0'); buffer_ptr = buffer; command = NULL; @@ -1093,8 +1193,17 @@ static int handle_request (int fd) /* {{{ */ if (strcasecmp (command, "update") == 0) { + /* don't re-write updates in replay mode */ + if (fd >= 0) + journal_write(command, buffer_ptr); + return (handle_request_update (fd, buffer_ptr, buffer_size)); } + else if (strcasecmp (command, "wrote") == 0 && fd < 0) + { + /* this is only valid in replay mode */ + return (handle_request_wrote (fd, buffer_ptr, buffer_size)); + } else if (strcasecmp (command, "flush") == 0) { return (handle_request_flush (fd, buffer_ptr, buffer_size)); @@ -1109,7 +1218,7 @@ static int handle_request (int fd) /* {{{ */ } else { - char result[4096]; + char result[CMD_MAX]; snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command); result[sizeof (result) - 1] = 0; @@ -1125,36 +1234,174 @@ static int handle_request (int fd) /* {{{ */ return (0); } /* }}} int handle_request */ -static void *connection_thread_main (void *args /* {{{ */ - __attribute__((unused))) +/* MUST NOT hold journal_lock before calling this */ +static void journal_rotate(void) /* {{{ */ +{ + FILE *old_fh = NULL; + + if (journal_cur == NULL || journal_old == NULL) + return; + + pthread_mutex_lock(&journal_lock); + + /* we rotate this way (rename before close) so that the we can release + * the journal lock as fast as possible. Journal writes to the new + * journal can proceed immediately after the new file is opened. The + * fclose can then block without affecting new updates. + */ + if (journal_fh != NULL) + { + old_fh = journal_fh; + rename(journal_cur, journal_old); + ++stats_journal_rotate; + } + + journal_fh = fopen(journal_cur, "a"); + pthread_mutex_unlock(&journal_lock); + + if (old_fh != NULL) + fclose(old_fh); + + if (journal_fh == NULL) + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)", + journal_cur, rrd_strerror(errno)); + +} /* }}} static void journal_rotate */ + +static void journal_done(void) /* {{{ */ +{ + if (journal_cur == NULL) + return; + + pthread_mutex_lock(&journal_lock); + if (journal_fh != NULL) + { + fclose(journal_fh); + journal_fh = NULL; + } + + RRDD_LOG(LOG_INFO, "removing journals"); + + unlink(journal_old); + unlink(journal_cur); + pthread_mutex_unlock(&journal_lock); + +} /* }}} static void journal_done */ + +static int journal_write(char *cmd, char *args) /* {{{ */ +{ + int chars; + + if (journal_fh == NULL) + return 0; + + pthread_mutex_lock(&journal_lock); + chars = fprintf(journal_fh, "%s %s\n", cmd, args); + pthread_mutex_unlock(&journal_lock); + + if (chars > 0) + { + pthread_mutex_lock(&stats_lock); + stats_journal_bytes += chars; + pthread_mutex_unlock(&stats_lock); + } + + return chars; +} /* }}} static int journal_write */ + +static int journal_replay (const char *file) /* {{{ */ +{ + FILE *fh; + int entry_cnt = 0; + int fail_cnt = 0; + uint64_t line = 0; + char entry[CMD_MAX]; + + if (file == NULL) return 0; + + fh = fopen(file, "r"); + if (fh == NULL) + { + if (errno != ENOENT) + RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)", + file, rrd_strerror(errno)); + return 0; + } + else + RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file); + + while(!feof(fh)) + { + size_t entry_len; + + ++line; + fgets(entry, sizeof(entry), fh); + entry_len = strlen(entry); + + /* check \n termination in case journal writing crashed mid-line */ + if (entry_len == 0) + continue; + else if (entry[entry_len - 1] != '\n') + { + RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line); + ++fail_cnt; + continue; + } + + entry[entry_len - 1] = '\0'; + + if (handle_request(-1, entry, entry_len) == 0) + ++entry_cnt; + else + ++fail_cnt; + } + + fclose(fh); + + if (entry_cnt > 0) + { + RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)", + entry_cnt, fail_cnt); + return 1; + } + else + return 0; + +} /* }}} static int journal_replay */ + +static void *connection_thread_main (void *args) /* {{{ */ { pthread_t self; int i; int fd; fd = *((int *) args); + free (args); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); { pthread_t *temp; - temp = (pthread_t *) realloc (connetion_threads, - sizeof (pthread_t) * (connetion_threads_num + 1)); + temp = (pthread_t *) realloc (connection_threads, + sizeof (pthread_t) * (connection_threads_num + 1)); if (temp == NULL) { RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); } else { - connetion_threads = temp; - connetion_threads[connetion_threads_num] = pthread_self (); - connetion_threads_num++; + connection_threads = temp; + connection_threads[connection_threads_num] = pthread_self (); + connection_threads_num++; } } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); while (do_shutdown == 0) { + char buffer[CMD_MAX]; + struct pollfd pollfd; int status; @@ -1188,7 +1435,18 @@ static void *connection_thread_main (void *args /* {{{ */ break; } - status = handle_request (fd); + status = (int) sread (fd, buffer, sizeof (buffer)); + if (status <= 0) + { + close (fd); + + if (status < 0) + RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed."); + + break; + } + + status = handle_request (fd, buffer, /*buffer_size=*/ status); if (status != 0) { close (fd); @@ -1198,25 +1456,24 @@ static void *connection_thread_main (void *args /* {{{ */ self = pthread_self (); /* Remove this thread from the connection threads list */ - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); /* Find out own index in the array */ - for (i = 0; i < connetion_threads_num; i++) - if (pthread_equal (connetion_threads[i], self) != 0) + for (i = 0; i < connection_threads_num; i++) + if (pthread_equal (connection_threads[i], self) != 0) break; - assert (i < connetion_threads_num); + assert (i < connection_threads_num); /* Move the trailing threads forward. */ - if (i < (connetion_threads_num - 1)) + if (i < (connection_threads_num - 1)) { - memmove (connetion_threads + i, - connetion_threads + i + 1, - sizeof (pthread_t) * (connetion_threads_num - i - 1)); + memmove (connection_threads + i, + connection_threads + i + 1, + sizeof (pthread_t) * (connection_threads_num - i - 1)); } - connetion_threads_num--; - pthread_mutex_unlock (&connetion_threads_lock); + connection_threads_num--; + pthread_mutex_unlock (&connection_threads_lock); - free (args); return (NULL); } /* }}} void *connection_thread_main */ @@ -1400,6 +1657,8 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); + RRDD_LOG(LOG_INFO, "listening for connections"); + while (do_shutdown == 0) { assert (pollfds_num == ((int) listen_fds_num)); @@ -1471,20 +1730,22 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* for (pollfds_num) */ } /* while (do_shutdown == 0) */ + RRDD_LOG(LOG_INFO, "starting shutdown"); + close_listen_sockets (); - pthread_mutex_lock (&connetion_threads_lock); - while (connetion_threads_num > 0) + pthread_mutex_lock (&connection_threads_lock); + while (connection_threads_num > 0) { pthread_t wait_for; - wait_for = connetion_threads[0]; + wait_for = connection_threads[0]; - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); pthread_join (wait_for, /* retval = */ NULL); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); return (NULL); } /* }}} void *listen_thread_main */ @@ -1501,6 +1762,9 @@ static int daemonize (void) /* {{{ */ static struct sigaction sa_term; static struct sigaction sa_pipe; + if (stay_foreground) + goto child_startup; + child = fork (); if (child < 0) { @@ -1535,6 +1799,7 @@ static int daemonize (void) /* {{{ */ dup (0); dup (0); +child_startup: /* Install signal handlers */ memset (&sa_int, 0, sizeof (sa_int)); sa_int.sa_handler = sig_int_handler; @@ -1549,6 +1814,7 @@ static int daemonize (void) /* {{{ */ sigaction (SIGPIPE, &sa_pipe, NULL); openlog ("rrdcached", LOG_PID, LOG_DAEMON); + RRDD_LOG(LOG_INFO, "starting up"); cache_tree = g_tree_new ((GCompareFunc) strcmp); if (cache_tree == NULL) @@ -1557,18 +1823,8 @@ static int daemonize (void) /* {{{ */ return (-1); } - memset (&queue_thread, 0, sizeof (queue_thread)); - status = pthread_create (&queue_thread, /* attr = */ NULL, - queue_thread_main, /* args = */ NULL); - if (status != 0) - { - RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); - return (-1); - } - - write_pidfile (); - - return (0); + status = write_pidfile (); + return status; } /* }}} int daemonize */ static int cleanup (void) /* {{{ */ @@ -1580,6 +1836,7 @@ static int cleanup (void) /* {{{ */ remove_pidfile (); + RRDD_LOG(LOG_INFO, "goodbye"); closelog (); return (0); @@ -1590,10 +1847,14 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1) + while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1) { switch (option) { + case 'g': + stay_foreground=1; + break; + case 'l': { char **temp; @@ -1647,6 +1908,22 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'z': + { + int temp; + + temp = atoi(optarg); + if (temp > 0) + config_write_jitter = temp; + else + { + fprintf (stderr, "Invalid write jitter: -z %s\n", optarg); + status = 2; + } + + break; + } + case 'b': { size_t len; @@ -1688,6 +1965,41 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'j': + { + struct stat statbuf; + const char *dir = optarg; + + status = stat(dir, &statbuf); + if (status != 0) + { + fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno)); + return 6; + } + + if (!S_ISDIR(statbuf.st_mode) + || access(dir, R_OK|W_OK|X_OK) != 0) + { + fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", + errno ? rrd_strerror(errno) : ""); + return 6; + } + + journal_cur = malloc(PATH_MAX + 1); + journal_old = malloc(PATH_MAX + 1); + if (journal_cur == NULL || journal_old == NULL) + { + fprintf(stderr, "malloc failure for journal files\n"); + return 6; + } + else + { + snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir); + snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir); + } + } + break; + case 'h': case '?': printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" @@ -1697,6 +2009,7 @@ static int read_options (int argc, char **argv) /* {{{ */ "Valid options are:\n" " -l
Socket address to listen to.\n" " -w Interval in which to write data.\n" + " -z Delay writes up to seconds to spread load" \ " -f Interval in which to flush dead data.\n" " -p Location of the PID-file.\n" " -b Base directory to change to.\n" @@ -1710,6 +2023,14 @@ static int read_options (int argc, char **argv) /* {{{ */ } /* switch (option) */ } /* while (getopt) */ + /* advise the user when values are not sane */ + if (config_flush_interval < 2 * config_write_interval) + fprintf(stderr, "WARNING: flush interval (-f) should be at least" + " 2x write interval (-w) !\n"); + if (config_write_jitter > config_write_interval) + fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than" + " write interval (-w) !\n"); + return (status); } /* }}} int read_options */ @@ -1742,8 +2063,40 @@ int main (int argc, char **argv) return (1); } - listen_thread_main (NULL); + if (journal_cur != NULL) + { + int had_journal = 0; + + pthread_mutex_lock(&journal_lock); + + RRDD_LOG(LOG_INFO, "checking for journal files"); + had_journal += journal_replay(journal_old); + had_journal += journal_replay(journal_cur); + + if (had_journal) + flush_old_values(-1); + + pthread_mutex_unlock(&journal_lock); + journal_rotate(); + + RRDD_LOG(LOG_INFO, "journal processing complete"); + } + + /* start the queue thread */ + memset (&queue_thread, 0, sizeof (queue_thread)); + status = pthread_create (&queue_thread, + NULL, /* attr */ + queue_thread_main, + NULL); /* args */ + if (status != 0) + { + RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread"); + cleanup(); + return (1); + } + + listen_thread_main (NULL); cleanup (); return (0); diff --git a/src/rrd_dump.c b/src/rrd_dump.c index 552c636..a32f4fb 100644 --- a/src/rrd_dump.c +++ b/src/rrd_dump.c @@ -493,43 +493,9 @@ int rrd_dump( return (-1); } - if (opt_daemon == NULL) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - opt_daemon = strdup (temp); - if (opt_daemon == NULL) - { - rrd_set_error("strdup failed."); - return (-1); - } - } - } - - if (opt_daemon != NULL) - { - int status; - - status = rrdc_connect (opt_daemon); - if (status != 0) - { - rrd_set_error ("rrdc_connect failed with status %i.", status); - return (-1); - } - - status = rrdc_flush (argv[optind]); - if (status != 0) - { - rrd_set_error ("rrdc_flush (%s) failed with status %i.", - argv[optind], status); - return (-1); - } - - rrdc_disconnect (); - } /* if (opt_daemon) */ + rc = rrdc_flush_if_daemon(opt_daemon, argv[optind]); + if (opt_daemon) free(opt_daemon); + if (rc) return (rc); if ((argc - optind) == 2) { rc = rrd_dump_opt_r(argv[optind], argv[optind + 1], opt_noheader); diff --git a/src/rrd_fetch.c b/src/rrd_fetch.c index 563e76b..568e262 100644 --- a/src/rrd_fetch.c +++ b/src/rrd_fetch.c @@ -167,41 +167,9 @@ int rrd_fetch( return -1; } - if (opt_daemon == NULL) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - opt_daemon = strdup (temp); - if (opt_daemon == NULL) - { - rrd_set_error("strdup failed."); - return (-1); - } - } - } - - if (opt_daemon != NULL) - { - status = rrdc_connect (opt_daemon); - if (status != 0) - { - rrd_set_error ("rrdc_connect failed with status %i.", status); - return (-1); - } - - status = rrdc_flush (argv[optind]); - if (status != 0) - { - rrd_set_error ("rrdc_flush (%s) failed with status %i.", - argv[optind], status); - return (-1); - } - - rrdc_disconnect (); - } /* if (opt_daemon) */ + status = rrdc_flush_if_daemon(opt_daemon, argv[optind]); + if (opt_daemon) free (opt_daemon); + if (status) return (-1); cf = argv[optind + 1]; diff --git a/src/rrd_flush.c b/src/rrd_flush.c index ba5f4f2..218a65a 100644 --- a/src/rrd_flush.c +++ b/src/rrd_flush.c @@ -71,23 +71,12 @@ int rrd_cmd_flush (int argc, char **argv) return (-1); } - if (opt_daemon == NULL) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - opt_daemon = strdup (temp); - if (opt_daemon == NULL) - { - rrd_set_error("strdup failed."); - return (-1); - } - } - } + /* try to connect to rrdcached */ + status = rrdc_connect(opt_daemon); + if (opt_daemon) free(opt_daemon); + if (status != 0) return status; - if (opt_daemon == NULL) + if (! rrdc_is_connected(opt_daemon)) { rrd_set_error ("Daemon address unknown. Please use the \"--daemon\" " "option to set an address on the command line or set the " @@ -96,19 +85,7 @@ int rrd_cmd_flush (int argc, char **argv) return (-1); } - status = rrdc_connect (opt_daemon); - if (status != 0) - { - rrd_set_error ("rrdc_connect failed with status %i.", status); - return (-1); - } - - status = rrdc_flush (argv[optind]); - if (status != 0) - rrd_set_error ("rrdc_flush (%s) failed with status %i.", - argv[optind], status); - - rrdc_disconnect (); + status = rrdc_flush(argv[optind]); return ((status == 0) ? 0 : -1); } /* int rrd_flush */ diff --git a/src/rrd_graph.c b/src/rrd_graph.c index 4f8c0d1..dc12ed6 100644 --- a/src/rrd_graph.c +++ b/src/rrd_graph.c @@ -307,11 +307,8 @@ int im_free( if (im == NULL) return 0; - if (im->use_rrdcached) - { - rrdc_disconnect (); - im->use_rrdcached = 0; - } + if (im->daemon_addr != NULL) + free(im->daemon_addr); for (i = 0; i < (unsigned) im->gdes_c; i++) { if (im->gdes[i].data_first) { @@ -845,7 +842,7 @@ int data_fetch( * - a connection to the daemon has been established * - this is the first occurrence of that RRD file */ - if (im->use_rrdcached) + if (rrdc_is_connected(im->daemon_addr)) { int status; @@ -869,7 +866,7 @@ int data_fetch( return (-1); } } - } /* if (im->use_rrdcached) */ + } /* if (rrdc_is_connected()) */ if ((rrd_fetch_fn(im->gdes[i].rrd, im->gdes[i].cf, @@ -3742,6 +3739,7 @@ void rrd_graph_init( #endif #endif im->base = 1000; + im->daemon_addr = NULL; im->draw_x_grid = 1; im->draw_y_grid = 1; im->extra_flags = 0; @@ -3757,7 +3755,6 @@ void rrd_graph_init( im->grinfo_current = (rrd_info_t *) NULL; im->imgformat = IF_PNG; im->imginfo = NULL; - im->use_rrdcached = 0; im->lazy = 0; im->logarithmic = 0; im->maxval = DNAN; @@ -4248,21 +4245,20 @@ void rrd_graph_options( break; case 'd': { - int status; - if (im->use_rrdcached) + if (im->daemon_addr != NULL) { rrd_set_error ("You cannot specify --daemon " "more than once."); return; } - status = rrdc_connect (optarg); - if (status != 0) + + im->daemon_addr = strdup(optarg); + if (im->daemon_addr == NULL) { - rrd_set_error ("rrdc_connect(%s) failed with status %i.", - optarg, status); - return; + rrd_set_error("strdup failed"); + return; } - im->use_rrdcached = 1; + break; } case '?': @@ -4274,24 +4270,9 @@ void rrd_graph_options( } } /* while (1) */ - if (im->use_rrdcached == 0) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - int status; - - status = rrdc_connect (temp); - if (status != 0) - { - rrd_set_error ("rrdc_connect(%s) failed with status %i.", - temp, status); - return; - } - im->use_rrdcached = 1; - } + { /* try to connect to rrdcached */ + int status = rrdc_connect(im->daemon_addr); + if (status != 0) return; } pango_cairo_context_set_font_options(pango_layout_get_context(im->layout), im->font_options); diff --git a/src/rrd_graph.h b/src/rrd_graph.h index cffce4b..8b86e86 100644 --- a/src/rrd_graph.h +++ b/src/rrd_graph.h @@ -213,7 +213,7 @@ typedef struct image_desc_t { char *imginfo; /* construct an