From: Kevin Bowling Date: Fri, 30 May 2014 02:58:51 +0000 (-0700) Subject: [METRICS-383] Add HostTags which adds tags for all metrics from this writer X-Git-Tag: collectd-5.5.0~223^2~7 X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=b9f4405301f31bf0fe282095654658a4f3ccdf19;p=collectd.git [METRICS-383] Add HostTags which adds tags for all metrics from this writer --- diff --git a/src/write_tsdb.c b/src/write_tsdb.c index b5183a42..2855b356 100644 --- a/src/write_tsdb.c +++ b/src/write_tsdb.c @@ -5,7 +5,7 @@ * Copyright (C) 2009 Paul Sadauskas * Copyright (C) 2009 Doug MacEachern * Copyright (C) 2007-2012 Florian octo Forster - * + * Copyright (C) 2013-2014 Limelight Networks, Inc. * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; only version 2 of the License is applicable. @@ -19,18 +19,15 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * - * Authors: + * Based on the write_graphite plugin. Authors: * Florian octo Forster * Doug MacEachern * Paul Sadauskas * Scott Sanders * Pierre-Yves Ritschard - * - * Modified by: + * write_tsdb Authors: * Brett Hawn * Kevin Bowling - * - * Based on the write_graphite plugin. **/ /* write_tsdb plugin configuation example @@ -40,6 +37,7 @@ * Host "localhost" * Port "4242" * Prefix "sys" + * HostTags "status=production deviceclass=www" * * */ @@ -52,9 +50,7 @@ #include "utils_cache.h" #include "utils_parse_option.h" -/* Folks without pthread will need to disable this plugin. */ #include - #include #include @@ -82,10 +78,10 @@ struct wt_callback { int sock_fd; - char *node; - char *service; - char *prefix; - char *postfix; + char *node; + char *service; + char *prefix; + char *host_tags; char escape_char; _Bool store_rates; @@ -104,68 +100,67 @@ struct wt_callback /* * Functions */ -static void wt_reset_buffer (struct wt_callback *cb) +static void wt_reset_buffer(struct wt_callback *cb) { - memset (cb->send_buf, 0, sizeof (cb->send_buf)); - cb->send_buf_free = sizeof (cb->send_buf); + memset(cb->send_buf, 0, sizeof(cb->send_buf)); + cb->send_buf_free = sizeof(cb->send_buf); cb->send_buf_fill = 0; - cb->send_buf_init_time = cdtime (); + cb->send_buf_init_time = cdtime(); } -static int wt_send_buffer (struct wt_callback *cb) +static int wt_send_buffer(struct wt_callback *cb) { ssize_t status = 0; - status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); + status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf)); if (status < 0) { char errbuf[1024]; - ERROR ("write_tsdb plugin: send failed with status %zi (%s)", - status, sstrerror (errno, errbuf, sizeof (errbuf))); - + ERROR("write_tsdb plugin: send failed with status %zi (%s)", + status, sstrerror (errno, errbuf, sizeof (errbuf))); close (cb->sock_fd); cb->sock_fd = -1; - return (-1); + return -1; } - return (0); + return 0; } /* NOTE: You must hold cb->send_lock when calling this function! */ -static int wt_flush_nolock (cdtime_t timeout, struct wt_callback *cb) +static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) { int status; - DEBUG ("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; " - "send_buf_fill = %zu;", - (double)timeout, - cb->send_buf_fill); + DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; " + "send_buf_fill = %zu;", + (double)timeout, + cb->send_buf_fill); /* timeout == 0 => flush unconditionally */ if (timeout > 0) { cdtime_t now; - now = cdtime (); + now = cdtime(); if ((cb->send_buf_init_time + timeout) > now) - return (0); + return 0; } if (cb->send_buf_fill <= 0) { - cb->send_buf_init_time = cdtime (); - return (0); + cb->send_buf_init_time = cdtime(); + return 0; } - status = wt_send_buffer (cb); - wt_reset_buffer (cb); + status = wt_send_buffer(cb); + wt_reset_buffer(cb); - return (status); + return status; } -static int wt_callback_init (struct wt_callback *cb) +static int wt_callback_init(struct wt_callback *cb) { struct addrinfo ai_hints; struct addrinfo *ai_list; @@ -176,37 +171,37 @@ static int wt_callback_init (struct wt_callback *cb) const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE; if (cb->sock_fd > 0) - return (0); + return 0; - memset (&ai_hints, 0, sizeof (ai_hints)); + memset(&ai_hints, 0, sizeof(ai_hints)); #ifdef AI_ADDRCONFIG - ai_hints.ai_flags |= AI_ADDRCONFIG; + ai_hints.ai_flags |= AI_ADDRCONFIG; #endif - ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_STREAM; ai_list = NULL; - status = getaddrinfo (node, service, &ai_hints, &ai_list); + status = getaddrinfo(node, service, &ai_hints, &ai_list); if (status != 0) { - ERROR ("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", - node, service, gai_strerror (status)); - return (-1); + ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", + node, service, gai_strerror (status)); + return -1; } assert (ai_list != NULL); for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { - cb->sock_fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, - ai_ptr->ai_protocol); + cb->sock_fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); if (cb->sock_fd < 0) continue; - status = connect (cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); if (status != 0) { - close (cb->sock_fd); + close(cb->sock_fd); cb->sock_fd = -1; continue; } @@ -214,24 +209,24 @@ static int wt_callback_init (struct wt_callback *cb) break; } - freeaddrinfo (ai_list); + freeaddrinfo(ai_list); if (cb->sock_fd < 0) { char errbuf[1024]; - ERROR ("write_tsdb plugin: Connecting to %s:%s failed. " - "The last error was: %s", node, service, - sstrerror (errno, errbuf, sizeof (errbuf))); - close (cb->sock_fd); - return (-1); + ERROR("write_tsdb plugin: Connecting to %s:%s failed. " + "The last error was: %s", node, service, + sstrerror (errno, errbuf, sizeof(errbuf))); + close(cb->sock_fd); + return -1; } - wt_reset_buffer (cb); + wt_reset_buffer(cb); - return (0); + return 0; } -static void wt_callback_free (void *data) +static void wt_callback_free(void *data) { struct wt_callback *cb; @@ -240,9 +235,9 @@ static void wt_callback_free (void *data) cb = data; - pthread_mutex_lock (&cb->send_lock); + pthread_mutex_lock(&cb->send_lock); - wt_flush_nolock (/* timeout = */ 0, cb); + wt_flush_nolock(0, cb); close(cb->sock_fd); cb->sock_fd = -1; @@ -250,124 +245,119 @@ static void wt_callback_free (void *data) sfree(cb->node); sfree(cb->service); sfree(cb->prefix); - sfree(cb->postfix); + sfree(cb->host_tags); - pthread_mutex_destroy (&cb->send_lock); + pthread_mutex_destroy(&cb->send_lock); sfree(cb); } -static int wt_flush (cdtime_t timeout, - const char *identifier __attribute__((unused)), - user_data_t *user_data) +static int wt_flush(cdtime_t timeout, + const char *identifier __attribute__((unused)), + user_data_t *user_data) { struct wt_callback *cb; int status; if (user_data == NULL) - return (-EINVAL); + return -EINVAL; cb = user_data->data; - pthread_mutex_lock (&cb->send_lock); + pthread_mutex_lock(&cb->send_lock); if (cb->sock_fd < 0) { - status = wt_callback_init (cb); + status = wt_callback_init(cb); if (status != 0) { - ERROR ("write_tsdb plugin: wt_callback_init failed."); - pthread_mutex_unlock (&cb->send_lock); - return (-1); + ERROR("write_tsdb plugin: wt_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; } } - status = wt_flush_nolock (timeout, cb); - pthread_mutex_unlock (&cb->send_lock); + status = wt_flush_nolock(timeout, cb); + pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } -static int wt_format_values (char *ret, size_t ret_len, - int ds_num, const data_set_t *ds, - const value_list_t *vl, - _Bool store_rates) +static int wt_format_values(char *ret, size_t ret_len, + int ds_num, const data_set_t *ds, + const value_list_t *vl, + _Bool store_rates) { size_t offset = 0; int status; gauge_t *rates = NULL; - assert (0 == strcmp (ds->type, vl->type)); + assert(0 == strcmp (ds->type, vl->type)); - memset (ret, 0, ret_len); + memset(ret, 0, ret_len); #define BUFFER_ADD(...) do { \ status = ssnprintf (ret + offset, ret_len - offset, \ __VA_ARGS__); \ if (status < 1) \ { \ - sfree (rates); \ - return (-1); \ + sfree(rates); \ + return -1; \ } \ else if (((size_t) status) >= (ret_len - offset)) \ { \ - sfree (rates); \ - return (-1); \ + sfree(rates); \ + return -1; \ } \ else \ offset += ((size_t) status); \ } while (0) if (ds->ds[ds_num].type == DS_TYPE_GAUGE) - BUFFER_ADD ("%f", vl->values[ds_num].gauge); + BUFFER_ADD("%f", vl->values[ds_num].gauge); else if (store_rates) { if (rates == NULL) rates = uc_get_rate (ds, vl); if (rates == NULL) { - WARNING ("format_values: " - "uc_get_rate failed."); - return (-1); + WARNING("format_values: " + "uc_get_rate failed."); + return -1; } - BUFFER_ADD ("%f", rates[ds_num]); + BUFFER_ADD("%f", rates[ds_num]); } else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) - BUFFER_ADD ("%llu", vl->values[ds_num].counter); + BUFFER_ADD("%llu", vl->values[ds_num].counter); else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) - BUFFER_ADD ("%" PRIi64, vl->values[ds_num].derive); + BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive); else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) - BUFFER_ADD ("%" PRIu64, vl->values[ds_num].absolute); + BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute); else { - ERROR ("format_values plugin: Unknown data source type: %i", - ds->ds[ds_num].type); - sfree (rates); - return (-1); + ERROR("format_values plugin: Unknown data source type: %i", + ds->ds[ds_num].type); + sfree(rates); + return -1; } #undef BUFFER_ADD - sfree (rates); - return (0); + sfree(rates); + return 0; } -static int wt_format_name (char *ret, int ret_len, - const value_list_t *vl, - const struct wt_callback *cb, - const char *ds_name) +static int wt_format_name(char *ret, int ret_len, + const value_list_t *vl, + const struct wt_callback *cb, + const char *ds_name) { char *prefix; - char *postfix; prefix = cb->prefix; if (prefix == NULL) prefix = ""; - postfix = cb->postfix; - if (postfix == NULL) - postfix = ""; - if (ds_name != NULL) { if (vl->plugin_instance[0] == '\0') { ssnprintf(ret, ret_len, "%s.%s.%s", @@ -396,7 +386,7 @@ static int wt_format_name (char *ret, int ret_len, prefix, vl->plugin, vl->plugin_instance, vl->type_instance); } - return (0); + return 0; } static int wt_send_message (const char* key, const char* value, @@ -404,84 +394,87 @@ static int wt_send_message (const char* key, const char* value, const char* host) { int status; - size_t message_len; + int message_len; + const char *message_fmt; char message[1024]; /* skip if value is NaN */ if (value[0] == 'n') - return (0); + return 0; - message_len = (size_t) ssnprintf (message, sizeof (message), - "put %s %u %s fqdn=%s\r\n", + message_fmt = "put %s %u %s fqdn=%s %s\r\n"; + message_len = ssnprintf (message, sizeof(message), + message_fmt, key, - (unsigned int) CDTIME_T_TO_TIME_T ( + (unsigned int)CDTIME_T_TO_TIME_T( time), value, - host); - if (message_len >= sizeof (message)) { - ERROR ("write_tsdb plugin: message buffer too small: " - "Need %zu bytes.", message_len + 1); - return (-1); + host, + cb->host_tags); + if (message_len >= sizeof(message)) { + ERROR("write_tsdb plugin: message buffer too small: " + "Need %d bytes.", message_len + 1); + return -1; } - pthread_mutex_lock (&cb->send_lock); + pthread_mutex_lock(&cb->send_lock); if (cb->sock_fd < 0) { - status = wt_callback_init (cb); + status = wt_callback_init(cb); if (status != 0) { - ERROR ("write_tsdb plugin: wt_callback_init failed."); - pthread_mutex_unlock (&cb->send_lock); - return (-1); + ERROR("write_tsdb plugin: wt_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; } } if (message_len >= cb->send_buf_free) { - status = wt_flush_nolock (/* timeout = */ 0, cb); + status = wt_flush_nolock(0, cb); if (status != 0) { - pthread_mutex_unlock (&cb->send_lock); - return (status); + pthread_mutex_unlock(&cb->send_lock); + return status; } } /* Assert that we have enough space for this message. */ - assert (message_len < cb->send_buf_free); + assert(message_len < cb->send_buf_free); /* `message_len + 1' because `message_len' does not include the * trailing null byte. Neither does `send_buffer_fill'. */ - memcpy (cb->send_buf + cb->send_buf_fill, + memcpy(cb->send_buf + cb->send_buf_fill, message, message_len + 1); cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG ("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", - cb->node, - cb->service, - cb->send_buf_fill, sizeof (cb->send_buf), - 100.0 * ((double) cb->send_buf_fill) / - ((double) sizeof (cb->send_buf)), - message); + DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", + cb->node, + cb->service, + cb->send_buf_fill, sizeof(cb->send_buf), + 100.0 * ((double) cb->send_buf_fill) / + ((double) sizeof(cb->send_buf)), + message); - pthread_mutex_unlock (&cb->send_lock); + pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } -static int wt_write_messages (const data_set_t *ds, const value_list_t *vl, - struct wt_callback *cb) +static int wt_write_messages(const data_set_t *ds, const value_list_t *vl, + struct wt_callback *cb) { char key[10*DATA_MAX_NAME_LEN]; char values[512]; int status, i; - if (0 != strcmp (ds->type, vl->type)) + if (0 != strcmp(ds->type, vl->type)) { - ERROR ("write_tsdb plugin: DS type does not match " - "value list type"); + ERROR("write_tsdb plugin: DS type does not match " + "value list type"); return -1; } @@ -492,153 +485,153 @@ static int wt_write_messages (const data_set_t *ds, const value_list_t *vl, if (cb->always_append_ds || (ds->ds_num > 1)) ds_name = ds->ds[i].name; - /* Copy the identifier to `key' and escape it. */ - status = wt_format_name (key, sizeof (key), vl, cb, ds_name); + /* Copy the identifier to 'key' and escape it. */ + status = wt_format_name(key, sizeof(key), vl, cb, ds_name); if (status != 0) { - ERROR ("write_tsdb plugin: error with format_name"); - return (status); + ERROR("write_tsdb plugin: error with format_name"); + return status; } - escape_string (key, sizeof (key)); + escape_string(key, sizeof(key)); /* Convert the values to an ASCII representation and put that into - * `values'. */ - status = wt_format_values (values, sizeof (values), i, ds, vl, - cb->store_rates); + * 'values'. */ + status = wt_format_values(values, sizeof(values), i, ds, vl, + cb->store_rates); if (status != 0) { - ERROR ("write_tsdb plugin: error with " - "wt_format_values"); - return (status); + ERROR("write_tsdb plugin: error with " + "wt_format_values"); + return status; } /* Send the message to tsdb */ - status = wt_send_message (key, values, vl->time, cb, vl->host); + status = wt_send_message(key, values, vl->time, cb, vl->host); if (status != 0) { - ERROR ("write_tsdb plugin: error with " - "wt_send_message"); - return (status); + ERROR("write_tsdb plugin: error with " + "wt_send_message"); + return status; } } - return (0); + return 0; } -static int wt_write (const data_set_t *ds, const value_list_t *vl, - user_data_t *user_data) +static int wt_write(const data_set_t *ds, const value_list_t *vl, + user_data_t *user_data) { struct wt_callback *cb; int status; if (user_data == NULL) - return (EINVAL); + return EINVAL; cb = user_data->data; - status = wt_write_messages (ds, vl, cb); + status = wt_write_messages(ds, vl, cb); - return (status); + return status; } -static int config_set_char (char *dest, - oconfig_item_t *ci) +static int config_set_char(char *dest, + oconfig_item_t *ci) { char buffer[4]; int status; - memset (buffer, 0, sizeof (buffer)); + memset(buffer, 0, sizeof(buffer)); - status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer)); + status = cf_util_get_string_buffer(ci, buffer, sizeof(buffer)); if (status != 0) return (status); if (buffer[0] == 0) { - ERROR ("write_tsdb plugin: Cannot use an empty string for the " - "\"EscapeCharacter\" option."); - return (-1); + ERROR("write_tsdb plugin: Cannot use an empty string for the " + "\"EscapeCharacter\" option."); + return -1; } if (buffer[1] != 0) { - WARNING ("write_tsdb plugin: Only the first character of the " - "\"EscapeCharacter\" option ('%c') will be used.", - (int) buffer[0]); + WARNING("write_tsdb plugin: Only the first character of the " + "\"EscapeCharacter\" option ('%c') will be used.", + (int) buffer[0]); } *dest = buffer[0]; - return (0); + return 0; } -static int wt_config_tsd (oconfig_item_t *ci) +static int wt_config_tsd(oconfig_item_t *ci) { struct wt_callback *cb; user_data_t user_data; char callback_name[DATA_MAX_NAME_LEN]; int i; - cb = malloc (sizeof (*cb)); + cb = malloc(sizeof(*cb)); if (cb == NULL) { - ERROR ("write_tsdb plugin: malloc failed."); - return (-1); + ERROR("write_tsdb plugin: malloc failed."); + return -1; } - memset (cb, 0, sizeof (*cb)); + memset(cb, 0, sizeof(*cb)); cb->sock_fd = -1; cb->node = NULL; cb->service = NULL; cb->prefix = NULL; - cb->postfix = NULL; + cb->host_tags = NULL; cb->escape_char = WT_DEFAULT_ESCAPE; cb->store_rates = 1; - pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); + pthread_mutex_init (&cb->send_lock, NULL); for (i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Host", child->key) == 0) - cf_util_get_string (child, &cb->node); - else if (strcasecmp ("Port", child->key) == 0) - cf_util_get_service (child, &cb->service); - else if (strcasecmp ("Prefix", child->key) == 0) - cf_util_get_string (child, &cb->prefix); - else if (strcasecmp ("Postfix", child->key) == 0) - cf_util_get_string (child, &cb->postfix); - else if (strcasecmp ("StoreRates", child->key) == 0) - cf_util_get_boolean (child, &cb->store_rates); - else if (strcasecmp ("SeparateInstances", child->key) == 0) - cf_util_get_boolean (child, &cb->separate_instances); - else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) - cf_util_get_boolean (child, &cb->always_append_ds); - else if (strcasecmp ("EscapeCharacter", child->key) == 0) - config_set_char (&cb->escape_char, child); + if (strcasecmp("Host", child->key) == 0) + cf_util_get_string(child, &cb->node); + else if (strcasecmp("Port", child->key) == 0) + cf_util_get_service(child, &cb->service); + else if (strcasecmp("Prefix", child->key) == 0) + cf_util_get_string(child, &cb->prefix); + else if (strcasecmp("HostTags", child->key) == 0) + cf_util_get_string(child, &cb->host_tags); + else if (strcasecmp("StoreRates", child->key) == 0) + cf_util_get_boolean(child, &cb->store_rates); + else if (strcasecmp("SeparateInstances", child->key) == 0) + cf_util_get_boolean(child, &cb->separate_instances); + else if (strcasecmp("AlwaysAppendDS", child->key) == 0) + cf_util_get_boolean(child, &cb->always_append_ds); + else if (strcasecmp("EscapeCharacter", child->key) == 0) + config_set_char(&cb->escape_char, child); else { - ERROR ("write_tsdb plugin: Invalid configuration " - "option: %s.", child->key); + ERROR("write_tsdb plugin: Invalid configuration " + "option: %s.", child->key); } } - ssnprintf (callback_name, sizeof (callback_name), "write_tsdb/%s/%s", - cb->node != NULL ? cb->node : WT_DEFAULT_NODE, - cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); + ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s", + cb->node != NULL ? cb->node : WT_DEFAULT_NODE, + cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); - memset (&user_data, 0, sizeof (user_data)); + memset(&user_data, 0, sizeof(user_data)); user_data.data = cb; user_data.free_func = wt_callback_free; - plugin_register_write (callback_name, wt_write, &user_data); + plugin_register_write(callback_name, wt_write, &user_data); user_data.free_func = NULL; - plugin_register_flush (callback_name, wt_flush, &user_data); + plugin_register_flush(callback_name, wt_flush, &user_data); - return (0); + return 0; } -static int wt_config (oconfig_item_t *ci) +static int wt_config(oconfig_item_t *ci) { int i; @@ -646,21 +639,21 @@ static int wt_config (oconfig_item_t *ci) { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Node", child->key) == 0) - wt_config_tsd (child); + if (strcasecmp("Node", child->key) == 0) + wt_config_tsd(child); else { - ERROR ("write_tsdb plugin: Invalid configuration " - "option: %s.", child->key); + ERROR("write_tsdb plugin: Invalid configuration " + "option: %s.", child->key); } } - return (0); + return 0; } -void module_register (void) +void module_register(void) { - plugin_register_complex_config ("write_tsdb", wt_config); + plugin_register_complex_config("write_tsdb", wt_config); } /* vim: set sw=4 ts=4 sts=4 tw=78 et : */