From: Florian Forster Date: Mon, 23 Jun 2008 19:56:03 +0000 (+0200) Subject: src/rrd_{client,daemon}.c: Move to a ASCII only protocol. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=7c1813445012ad944e2d19c00acbb6809a5341bc;p=rrdtool.git src/rrd_{client,daemon}.c: Move to a ASCII only protocol. Previously the fields were separated by Null-bytes. This is basically not possible to type in using telnet or something similar. Moving to a protocol that's easy to type in manually is hopefully easier to debug and use in the long run. --- diff --git a/src/rrd_client.c b/src/rrd_client.c index a34ecfe..ef39d48 100644 --- a/src/rrd_client.c +++ b/src/rrd_client.c @@ -33,22 +33,133 @@ #include static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -static int sd; +static int sd = -1; +#if 0 static FILE *sh; +#endif + +static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (sd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + { + close (sd); + sd = -1; +#if 0 + sh = NULL; +#endif + errno = EPROTO; + return (-1); + } + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (sd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ static int buffer_add_string (const char *str, /* {{{ */ char **buffer_ret, size_t *buffer_size_ret) { - size_t str_size; + char *buffer; + size_t buffer_size; + size_t buffer_pos; + size_t i; + int status; - str_size = strlen (str) + 1; + buffer = *buffer_ret; + buffer_size = *buffer_size_ret; + buffer_pos = 0; - if (*buffer_size_ret < str_size) + i = 0; + status = -1; + while (buffer_pos < buffer_size) + { + if (str[i] == 0) + { + buffer[buffer_pos] = ' '; + buffer_pos++; + status = 0; + break; + } + else if ((str[i] == ' ') || (str[i] == '\\')) + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer[buffer_pos] = '\\'; + buffer_pos++; + buffer[buffer_pos] = str[i]; + buffer_pos++; + } + else + { + buffer[buffer_pos] = str[i]; + buffer_pos++; + } + i++; + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) return (-1); - memcpy (*buffer_ret, str, str_size); - *buffer_ret += str_size; - *buffer_size_ret -= str_size; + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; return (0); } /* }}} int buffer_add_string */ @@ -78,7 +189,7 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ pthread_mutex_lock (&lock); - if (sh != NULL) + if (sd >= 0) { pthread_mutex_unlock (&lock); return (0); @@ -104,6 +215,7 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ return (status); } +#if 0 sh = fdopen (sd, "w+"); if (sh == NULL) { @@ -113,6 +225,7 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ pthread_mutex_unlock (&lock); return (status); } +#endif pthread_mutex_unlock (&lock); @@ -136,7 +249,7 @@ int rrdc_connect (const char *addr) /* {{{ */ pthread_mutex_lock (&lock); - if (sh != NULL) + if (sd >= 0) { pthread_mutex_unlock (&lock); return (0); @@ -177,6 +290,7 @@ int rrdc_connect (const char *addr) /* {{{ */ continue; } +#if 0 sh = fdopen (sd, "w+"); if (sh == NULL) { @@ -185,6 +299,7 @@ int rrdc_connect (const char *addr) /* {{{ */ sd = -1; continue; } +#endif assert (status == 0); break; @@ -200,17 +315,21 @@ int rrdc_disconnect (void) /* {{{ */ pthread_mutex_lock (&lock); - if (sh == NULL) + if (sd < 0) { pthread_mutex_unlock (&lock); return (-1); } +#if 0 status = fclose (sh); if (status != 0) status = errno; sh = NULL; +#else + status = 0; +#endif sd = -1; pthread_mutex_unlock (&lock); @@ -223,30 +342,51 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ { char buffer[4096]; char *buffer_ptr; + size_t buffer_free; size_t buffer_size; int status; int i; memset (buffer, 0, sizeof (buffer)); buffer_ptr = &buffer[0]; - buffer_size = sizeof (buffer) - 1; + buffer_free = sizeof (buffer); + + status = buffer_add_string ("update", &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); - buffer_add_string ("update", &buffer_ptr, &buffer_size); - buffer_add_string (filename, &buffer_ptr, &buffer_size); for (i = 0; i < values_num; i++) - buffer_add_value (values[i], &buffer_ptr, &buffer_size); + { + status = buffer_add_value (values[i], &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + } + + assert (buffer_free < sizeof (buffer)); + buffer_size = sizeof (buffer) - buffer_free; + assert (buffer[buffer_size - 1] == ' '); + buffer[buffer_size - 1] = '\n'; pthread_mutex_lock (&lock); - if (sh == NULL) + if (sd < 0) { pthread_mutex_unlock (&lock); return (ENOTCONN); } - status = write (sd, buffer, sizeof (buffer) - buffer_size); + status = swrite (buffer, buffer_size); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } - status = read (sd, buffer, sizeof (buffer)); + status = sread (buffer, sizeof (buffer)); if (status < 0) { status = errno; @@ -259,10 +399,9 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ return (ENODATA); } - status = atoi (buffer); - pthread_mutex_unlock (&lock); + status = atoi (buffer); return (status); } /* }}} int rrd_update_daemon */ diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index b72f62e..df31d51 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -184,6 +184,8 @@ static int enqueue_cache_item (cache_item_t *ci) /* {{{ */ ci->flags |= CI_FLAGS_IN_QUEUE; + ci->flags |= CI_FLAGS_IN_QUEUE; + return (0); } /* }}} int enqueue_cache_item */ @@ -204,7 +206,8 @@ static gboolean tree_callback_flush (gpointer key /* {{{ */ now = *((time_t *) data); if (((now - ci->last_flush_time) >= config_write_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) enqueue_cache_item (ci); return (TRUE); @@ -315,12 +318,70 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ return (NULL); } /* }}} void *queue_thread_main */ +static int buffer_get_field (char **buffer_ret, /* {{{ */ + size_t *buffer_size_ret, char **field_ret) +{ + char *buffer; + size_t buffer_pos; + size_t buffer_size; + char *field; + size_t field_size; + int status; + + buffer = *buffer_ret; + buffer_pos = 0; + buffer_size = *buffer_size_ret; + field = *buffer_ret; + field_size = 0; + + /* This is ensured by `handle_request'. */ + assert (buffer[buffer_size - 1] == ' '); + + status = -1; + while (buffer_pos < buffer_size) + { + /* Check for end-of-field or end-of-buffer */ + if (buffer[buffer_pos] == ' ') + { + field[field_size] = 0; + field_size++; + buffer_pos++; + status = 0; + break; + } + /* Handle escaped characters. */ + else if (buffer[buffer_pos] == '\\') + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer_pos++; + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + /* Normal operation */ + else + { + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (status); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + *field_ret = field; + + return (0); +} /* }}} int buffer_get_field */ + static int handle_request_update (int fd, /* {{{ */ - char *buffer, int buffer_size __attribute__((unused))) + char *buffer, size_t buffer_size) { char *file; - char *value; - char *buffer_ptr; int values_num = 0; int status; @@ -331,15 +392,17 @@ static int handle_request_update (int fd, /* {{{ */ now = time (NULL); - buffer_ptr = buffer; - - file = buffer_ptr; - buffer_ptr += strlen (file) + 1; + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name."); + return (-1); + } pthread_mutex_lock (&cache_lock); ci = g_tree_lookup (cache_tree, file); - if (ci == NULL) + if (ci == NULL) /* {{{ */ { ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) @@ -368,15 +431,20 @@ static int handle_request_update (int fd, /* {{{ */ RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", ci->file); - } + } /* }}} */ assert (ci != NULL); - while (*buffer_ptr != 0) + while (buffer_size > 0) { char **temp; + char *value; - value = buffer_ptr; - buffer_ptr += strlen (value) + 1; + status = buffer_get_field (&buffer, &buffer_size, &value); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field."); + break; + } temp = (char **) realloc (ci->values, sizeof (char *) * (ci->values_num + 1)); @@ -399,7 +467,8 @@ static int handle_request_update (int fd, /* {{{ */ } if (((now - ci->last_flush_time) >= config_write_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) { enqueue_cache_item (ci); pthread_cond_signal (&cache_cond); @@ -410,7 +479,7 @@ static int handle_request_update (int fd, /* {{{ */ snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); answer[sizeof (answer) - 1] = 0; - status = write (fd, answer, sizeof (answer)); + status = write (fd, answer, strlen (answer)); if (status < 0) { status = errno; @@ -424,29 +493,41 @@ static int handle_request_update (int fd, /* {{{ */ static int handle_request (int fd) /* {{{ */ { char buffer[4096]; - int buffer_size; + size_t buffer_size; + char *buffer_ptr; + char *command; + int status; - buffer_size = read (fd, buffer, sizeof (buffer)); - if (buffer_size < 1) + status = read (fd, buffer, sizeof (buffer)); + if (status < 1) { RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); return (-1); } + buffer_size = status; assert (((size_t) buffer_size) <= sizeof (buffer)); - if ((buffer[buffer_size - 2] != 0) - || (buffer[buffer_size - 1] != 0)) + if (buffer[buffer_size - 1] != '\n') { RRDD_LOG (LOG_INFO, "handle_request: malformed request."); return (-1); } + /* Place the normal field separator at the end to simplify + * `buffer_get_field's work. */ + buffer[buffer_size - 1] = ' '; + + buffer_ptr = buffer; + command = NULL; + status = buffer_get_field (&buffer_ptr, &buffer_size, &command); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request: Unable parse command."); + return (-1); + } - /* fields in the buffer a separated by null bytes. */ - if (strcmp (buffer, "update") == 0) + if (strcmp (command, "update") == 0) { - int offset = strlen ("update") + 1; - return (handle_request_update (fd, buffer + offset, - buffer_size - offset)); + return (handle_request_update (fd, buffer_ptr, buffer_size)); } else {