From: Pavel Rochnyack Date: Sun, 19 May 2019 10:27:23 +0000 (+0700) Subject: check_uptime: New plugin, based on new cache_event callback. X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=64b9342a6c0ac420f3a01096fc319ce1d1ae8746;p=collectd.git check_uptime: New plugin, based on new cache_event callback. Plugin checks for cache events for uptime metric, and send notifications when metric state changed. --- diff --git a/Makefile.am b/Makefile.am index 85f8da8a..b9a57a8d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -774,6 +774,12 @@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS) chrony_la_LIBADD = -lm endif +if BUILD_PLUGIN_CHECK_UPTIME +pkglib_LTLIBRARIES += check_uptime.la +check_uptime_la_SOURCES = src/check_uptime.c +check_uptime_la_LDFLAGS = $(PLUGIN_LDFLAGS) +endif + if BUILD_PLUGIN_CONNTRACK pkglib_LTLIBRARIES += conntrack.la conntrack_la_SOURCES = src/conntrack.c diff --git a/configure.ac b/configure.ac index c95422f4..18633890 100644 --- a/configure.ac +++ b/configure.ac @@ -6793,6 +6793,7 @@ AC_PLUGIN([bind], [$plugin_bind], [ISC Bind nameserv AC_PLUGIN([ceph], [$plugin_ceph], [Ceph daemon statistics]) AC_PLUGIN([cgroups], [$plugin_cgroups], [CGroups CPU usage accounting]) AC_PLUGIN([chrony], [yes], [Chrony statistics]) +AC_PLUGIN([check_uptime], [yes], [Notify about uptime reset]) AC_PLUGIN([conntrack], [$plugin_conntrack], [nf_conntrack statistics]) AC_PLUGIN([contextswitch], [$plugin_contextswitch], [context switch statistics]) AC_PLUGIN([cpu], [$plugin_cpu], [CPU usage statistics]) @@ -7220,6 +7221,7 @@ AC_MSG_RESULT([ bind . . . . . . . . $enable_bind]) AC_MSG_RESULT([ ceph . . . . . . . . $enable_ceph]) AC_MSG_RESULT([ cgroups . . . . . . . $enable_cgroups]) AC_MSG_RESULT([ chrony. . . . . . . . $enable_chrony]) +AC_MSG_RESULT([ check_uptime. . . . . $enable_check_uptime]) AC_MSG_RESULT([ conntrack . . . . . . $enable_conntrack]) AC_MSG_RESULT([ contextswitch . . . . $enable_contextswitch]) AC_MSG_RESULT([ cpu . . . . . . . . . $enable_cpu]) diff --git a/src/check_uptime.c b/src/check_uptime.c new file mode 100644 index 00000000..33363b54 --- /dev/null +++ b/src/check_uptime.c @@ -0,0 +1,273 @@ +/** + * collectd - src/check_uptime.c + * Copyright (C) 2007-2019 Florian Forster + * Copyright (C) 2019 Pavel V. Rochnyack + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Florian octo Forster + * Pavel Rochnyak + **/ + +#include "collectd.h" +#include "plugin.h" +#include "utils/avltree/avltree.h" +#include "utils/common/common.h" +#include "utils_cache.h" + +/* Types are registered only in `config` phase, so access is not protected by + * locks */ +c_avl_tree_t *types_tree = NULL; + +static int format_uptime(unsigned long uptime_sec, char *buf, size_t bufsize) { + + unsigned int uptime_days = uptime_sec / 24 / 3600; + uptime_sec -= uptime_days * 24 * 3600; + unsigned int uptime_hours = uptime_sec / 3600; + uptime_sec -= uptime_hours * 3600; + unsigned int uptime_mins = uptime_sec / 60; + uptime_sec -= uptime_mins * 60; + + int ret = 0; + if (uptime_days) { + ret += snprintf(buf + ret, bufsize - ret, " %u day(s)", uptime_days); + } + if (uptime_days || uptime_hours) { + ret += snprintf(buf + ret, bufsize - ret, " %u hour(s)", uptime_hours); + } + if (uptime_days || uptime_hours || uptime_mins) { + ret += snprintf(buf + ret, bufsize - ret, " %u min", uptime_mins); + } + ret += snprintf(buf + ret, bufsize - ret, " %lu sec.", uptime_sec); + return ret; +} + +static int cu_notify(enum cache_event_type_e event_type, const value_list_t *vl, + gauge_t old_uptime, gauge_t new_uptime) { + notification_t n; + NOTIFICATION_INIT_VL(&n, vl); + + int status; + char *buf = n.message; + size_t bufsize = sizeof(n.message); + + n.time = vl->time; + + const char *service = "Service"; + if (strcmp(vl->plugin, "uptime") == 0) + service = "Host"; + + switch (event_type) { + case CE_VALUE_NEW: + n.severity = NOTIF_OKAY; + status = snprintf(buf, bufsize, "%s is running.", service); + buf += status; + bufsize -= status; + break; + case CE_VALUE_UPDATE: + n.severity = NOTIF_WARNING; + status = snprintf(buf, bufsize, "%s just restarted.", service); + buf += status; + bufsize -= status; + break; + case CE_VALUE_EXPIRED: + n.severity = NOTIF_FAILURE; + status = snprintf(buf, bufsize, "%s is unreachable.", service); + buf += status; + bufsize -= status; + break; + } + + if (!isnan(old_uptime)) { + status = snprintf(buf, bufsize, " Uptime was:"); + buf += status; + bufsize -= status; + + status = format_uptime(old_uptime, buf, bufsize); + buf += status; + bufsize -= status; + + plugin_notification_meta_add_double(&n, "LastValue", old_uptime); + } + + if (!isnan(new_uptime)) { + status = snprintf(buf, bufsize, " Uptime now:"); + buf += status; + bufsize -= status; + + status = format_uptime(new_uptime, buf, bufsize); + buf += status; + bufsize -= status; + + plugin_notification_meta_add_double(&n, "CurrentValue", new_uptime); + } + + plugin_dispatch_notification(&n); + + plugin_notification_meta_free(n.meta); + return 0; +} + +static int cu_cache_event(cache_event_t *event, + __attribute__((unused)) user_data_t *ud) { + gauge_t values_history[2]; + + /* For CE_VALUE_EXPIRED */ + int ret; + value_t *values; + size_t values_num; + gauge_t old_uptime = NAN; + + switch (event->type) { + case CE_VALUE_NEW: + DEBUG("check_uptime: CE_VALUE_NEW, %s", event->value_list_name); + if (c_avl_get(types_tree, event->value_list->type, NULL) == 0) { + event->ret = 1; + assert(event->value_list->values_len > 0); + cu_notify(CE_VALUE_NEW, event->value_list, NAN /* old */, + event->value_list->values[0].gauge /* new */); + } + break; + case CE_VALUE_UPDATE: + DEBUG("check_uptime: CE_VALUE_UPDATE, %s", event->value_list_name); + if (uc_get_history_by_name(event->value_list_name, values_history, 2, 1)) { + ERROR("check_uptime plugin: Failed to get value history for %s.", + event->value_list_name); + } else { + if (!isnan(values_history[0]) && !isnan(values_history[1]) && + values_history[0] < values_history[1]) { + cu_notify(CE_VALUE_UPDATE, event->value_list, + values_history[1] /* old */, values_history[0] /* new */); + } + } + break; + case CE_VALUE_EXPIRED: + DEBUG("check_uptime: CE_VALUE_EXPIRED, %s", event->value_list_name); + ret = uc_get_value_by_name(event->value_list_name, &values, &values_num); + if (ret == 0) { + old_uptime = values[0].gauge; + sfree(values); + } + + cu_notify(CE_VALUE_EXPIRED, event->value_list, old_uptime, NAN /* new */); + break; + } + return 0; +} + +static int cu_config(oconfig_item_t *ci) { + if (types_tree == NULL) { + types_tree = c_avl_create((int (*)(const void *, const void *))strcmp); + if (types_tree == NULL) { + ERROR("check_uptime plugin: c_avl_create failed."); + return -1; + } + } + + for (int i = 0; i < ci->children_num; ++i) { + oconfig_item_t *child = ci->children + i; + if (strcasecmp("Type", child->key) == 0) { + if ((child->values_num != 1) || + (child->values[0].type != OCONFIG_TYPE_STRING)) { + WARNING("check_uptime plugin: The `Type' option needs exactly one " + "string argument."); + return -1; + } + char *type = child->values[0].value.string; + + if (c_avl_get(types_tree, type, NULL) == 0) { + ERROR("check_uptime plugin: Type `%s' already added.", type); + return -1; + } + + char *type_copy = strdup(type); + if (type_copy == NULL) { + ERROR("check_uptime plugin: strdup failed."); + return -1; + } + + int status = c_avl_insert(types_tree, type_copy, NULL); + if (status != 0) { + ERROR("check_uptime plugin: c_avl_insert failed."); + sfree(type_copy); + return -1; + } + } else + WARNING("check_uptime plugin: Ignore unknown config option `%s'.", + child->key); + } + + return 0; +} + +static int cu_init(void) { + if (types_tree == NULL) { + types_tree = c_avl_create((int (*)(const void *, const void *))strcmp); + if (types_tree == NULL) { + ERROR("check_uptime plugin: c_avl_create failed."); + return -1; + } + /* Default configuration */ + char *type = strdup("uptime"); + if (type == NULL) { + ERROR("check_uptime plugin: strdup failed."); + return -1; + } + int status = c_avl_insert(types_tree, type, NULL); + if (status != 0) { + ERROR("check_uptime plugin: c_avl_insert failed."); + sfree(type); + return -1; + } + } + + int ret = 0; + char *type; + void *val; + c_avl_iterator_t *iter = c_avl_get_iterator(types_tree); + while (c_avl_iterator_next(iter, (void *)&type, (void *)&val) == 0) { + data_set_t const *ds = plugin_get_ds(type); + if (ds == NULL) { + ERROR("check_uptime plugin: Failed to look up type \"%s\".", type); + ret = -1; + continue; + } + if (ds->ds_num != 1) { + ERROR("check_uptime plugin: The type \"%s\" has %" PRIsz " data sources. " + "Only types with a single GAUGE data source are supported.", + ds->type, ds->ds_num); + ret = -1; + continue; + } + if (ds->ds[0].type != DS_TYPE_GAUGE) { + ERROR("check_uptime plugin: The type \"%s\" has wrong data source type. " + "Only types with a single GAUGE data source are supported.", + ds->type); + ret = -1; + continue; + } + } + c_avl_iterator_destroy(iter); + + if (ret == 0) + plugin_register_cache_event("check_uptime", cu_cache_event, NULL); + + return ret; +} + +void module_register(void) { + plugin_register_complex_config("check_uptime", cu_config); + plugin_register_init("check_uptime", cu_init); +} diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 7f09c5cc..40111553 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -102,6 +102,7 @@ #@BUILD_PLUGIN_CEPH_TRUE@LoadPlugin ceph #@BUILD_PLUGIN_CGROUPS_TRUE@LoadPlugin cgroups #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony +#@BUILD_PLUGIN_CHECK_UPTIME_TRUE@LoadPlugin check_uptime #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 71931c2b..1190895c 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -1548,6 +1548,35 @@ at all, B cgroups are selected. =back +=head2 Plugin C + +The I designed to check and notify about host or service +status based on I metric. + +When new metric of I type appears in cache, OK notification is sent. +When new value for metric is less than previous value, WARNING notification is +sent about host/service restart. +When no new updates comes for metric and cache entry expires, then FAILURE +notification is sent about unreachable host or service. + +By default (when no explicit configuration), plugin checks for I metric. + +B + + + Type "uptime" + Type "my_uptime_type" + + +=over 4 + +=item B I + +Metric type to check for status/values. The type should consist single GAUGE +data source. + +=back + =head2 Plugin C The C plugin collects ntp data from a B server, such as clock diff --git a/src/daemon/plugin.c b/src/daemon/plugin.c index b4e5ae72..10a20648 100644 --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@ -85,6 +85,14 @@ struct read_func_s { }; typedef struct read_func_s read_func_t; +struct cache_event_func_s { + plugin_cache_event_cb callback; + char *name; + user_data_t user_data; + plugin_ctx_t plugin_ctx; +}; +typedef struct cache_event_func_s cache_event_func_t; + struct write_queue_s; typedef struct write_queue_s write_queue_t; struct write_queue_s { @@ -112,6 +120,9 @@ static llist_t *list_shutdown; static llist_t *list_log; static llist_t *list_notification; +static size_t list_cache_event_num; +static cache_event_func_t list_cache_event[32]; + static fc_chain_t *pre_cache_chain; static fc_chain_t *post_cache_chain; @@ -263,8 +274,6 @@ static void destroy_read_heap(void) /* {{{ */ static int register_callback(llist_t **list, /* {{{ */ const char *name, callback_func_t *cf) { - llentry_t *le; - char *key; if (*list == NULL) { *list = llist_create(); @@ -276,14 +285,14 @@ static int register_callback(llist_t **list, /* {{{ */ } } - key = strdup(name); + char *key = strdup(name); if (key == NULL) { ERROR("plugin: register_callback: strdup failed."); destroy_callback(cf); return -1; } - le = llist_search(*list, name); + llentry_t *le = llist_search(*list, name); if (le == NULL) { le = llentry_create(key, cf); if (le == NULL) { @@ -296,9 +305,7 @@ static int register_callback(llist_t **list, /* {{{ */ llist_append(*list, le); } else { - callback_func_t *old_cf; - - old_cf = le->value; + callback_func_t *old_cf = le->value; le->value = cf; P_WARNING("register_callback: " @@ -1310,6 +1317,60 @@ EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback, return create_register_callback(&list_missing, name, (void *)callback, ud); } /* int plugin_register_missing */ +EXPORT int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud) { + + if (name == NULL || callback == NULL) + return EINVAL; + + char *name_copy = strdup(name); + if (name_copy == NULL) { + P_ERROR("plugin_register_cache_event: strdup failed."); + free_userdata(ud); + return ENOMEM; + } + + if (list_cache_event_num >= 32) { + P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried " + "to be registered."); + free_userdata(ud); + return ENOMEM; + } + + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + + if (strcmp(name, cef->name) == 0) { + P_ERROR("plugin_register_cache_event: a callback named `%s' already " + "registered!", + name); + free_userdata(ud); + return -1; + } + } + + user_data_t user_data; + if (ud == NULL) { + user_data = (user_data_t){ + .data = NULL, .free_func = NULL, + }; + } else { + user_data = *ud; + } + + list_cache_event[list_cache_event_num] = + (cache_event_func_t){.callback = callback, + .name = name_copy, + .user_data = user_data, + .plugin_ctx = plugin_get_ctx()}; + list_cache_event_num++; + + return 0; +} /* int plugin_register_cache_event */ + EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) { return create_register_callback(&list_shutdown, name, (void *)callback, NULL); } /* int plugin_register_shutdown */ @@ -1511,6 +1572,32 @@ EXPORT int plugin_unregister_missing(const char *name) { return plugin_unregister(list_missing, name); } +EXPORT int plugin_unregister_cache_event(const char *name) { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + if (strcmp(name, cef->name) == 0) { + /* Mark callback as inactive, so mask in cache entries remains actual */ + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } + } + return 0; +} + +static void destroy_cache_event_callbacks() { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } +} + EXPORT int plugin_unregister_shutdown(const char *name) { return plugin_unregister(list_shutdown, name); } @@ -1855,6 +1942,7 @@ EXPORT int plugin_shutdown_all(void) { * the data isn't freed twice. */ destroy_all_callbacks(&list_flush); destroy_all_callbacks(&list_missing); + destroy_cache_event_callbacks(); destroy_all_callbacks(&list_write); destroy_all_callbacks(&list_notification); @@ -1895,6 +1983,82 @@ EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */ return 0; } /* int }}} plugin_dispatch_missing */ +void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl) { + switch (event_type) { + case CE_VALUE_NEW: + callbacks_mask = 0; + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event NEW.", + cef->name, status); + } else { + if (event.ret) { + DEBUG( + "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.", + cef->name, name); + callbacks_mask |= (1 << (i)); + } else { + DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.", + cef->name, name); + } + } + } + + if (callbacks_mask) + uc_set_callbacks_mask(name, callbacks_mask); + + break; + case CE_VALUE_UPDATE: + case CE_VALUE_EXPIRED: + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + if (callbacks_mask && (1 << (i)) == 0) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event %s.", + cef->name, status, + ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED")); + } + } + break; + } + return; +} + static int plugin_dispatch_values_internal(value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; diff --git a/src/daemon/plugin.h b/src/daemon/plugin.h index 6b3a030c..c3534e85 100644 --- a/src/daemon/plugin.h +++ b/src/daemon/plugin.h @@ -171,6 +171,15 @@ struct user_data_s { }; typedef struct user_data_s user_data_t; +enum cache_event_type_e { CE_VALUE_NEW, CE_VALUE_UPDATE, CE_VALUE_EXPIRED }; + +typedef struct cache_event_s { + enum cache_event_type_e type; + const value_list_t *value_list; + const char *value_list_name; + int ret; +} cache_event_t; + struct plugin_ctx_s { char *name; cdtime_t interval; @@ -192,6 +201,11 @@ typedef int (*plugin_flush_cb)(cdtime_t timeout, const char *identifier, * callbacks should be called, greater than zero if no more callbacks should be * called. */ typedef int (*plugin_missing_cb)(const value_list_t *, user_data_t *); +/* "cache event" callback. CE_VALUE_NEW events are sent to all registered + * callbacks. Callback should check if it interested in further CE_VALUE_UPDATE + * and CE_VALUE_EXPIRED events for metric and set event->ret = 1 if so. + */ +typedef int (*plugin_cache_event_cb)(cache_event_t *, user_data_t *); typedef void (*plugin_log_cb)(int severity, const char *message, user_data_t *); typedef int (*plugin_shutdown_cb)(void); typedef int (*plugin_notification_cb)(const notification_t *, user_data_t *); @@ -294,6 +308,9 @@ int plugin_register_flush(const char *name, plugin_flush_cb callback, user_data_t const *user_data); int plugin_register_missing(const char *name, plugin_missing_cb callback, user_data_t const *user_data); +int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud); int plugin_register_shutdown(const char *name, plugin_shutdown_cb callback); int plugin_register_data_set(const data_set_t *ds); int plugin_register_log(const char *name, plugin_log_cb callback, @@ -310,6 +327,7 @@ int plugin_unregister_read_group(const char *group); int plugin_unregister_write(const char *name); int plugin_unregister_flush(const char *name); int plugin_unregister_missing(const char *name); +int plugin_unregister_cache_event(const char *name); int plugin_unregister_shutdown(const char *name); int plugin_unregister_data_set(const char *name); int plugin_unregister_log(const char *name); @@ -380,6 +398,9 @@ __attribute__((sentinel)) int plugin_dispatch_multivalue(value_list_t const *vl, int store_type, ...); int plugin_dispatch_missing(const value_list_t *vl); +void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl); int plugin_dispatch_notification(const notification_t *notif); diff --git a/src/daemon/utils_cache.c b/src/daemon/utils_cache.c index c53e5d14..722fa2da 100644 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@ -67,6 +67,7 @@ typedef struct cache_entry_s { size_t history_length; meta_data_t *meta; + unsigned long callbacks_mask; } cache_entry_t; struct uc_iter_s { @@ -140,18 +141,15 @@ static void uc_check_range(const data_set_t *ds, cache_entry_t *ce) { static int uc_insert(const data_set_t *ds, const value_list_t *vl, const char *key) { - char *key_copy; - cache_entry_t *ce; - /* `cache_lock' has been locked by `uc_update' */ - key_copy = strdup(key); + char *key_copy = strdup(key); if (key_copy == NULL) { ERROR("uc_insert: strdup failed."); return -1; } - ce = cache_alloc(ds->ds_num); + cache_entry_t *ce = cache_alloc(ds->ds_num); if (ce == NULL) { sfree(key_copy); ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num); @@ -226,6 +224,7 @@ int uc_check_timeout(void) { char *key; cdtime_t time; cdtime_t interval; + unsigned long callbacks_mask; } *expired = NULL; size_t expired_num = 0; @@ -251,6 +250,7 @@ int uc_check_timeout(void) { expired[expired_num].key = strdup(key); expired[expired_num].time = ce->last_time; expired[expired_num].interval = ce->interval; + expired[expired_num].callbacks_mask = ce->callbacks_mask; if (expired[expired_num].key == NULL) { ERROR("uc_check_timeout: strdup failed."); @@ -285,6 +285,10 @@ int uc_check_timeout(void) { } plugin_dispatch_missing(&vl); + + if (expired[i].callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask, + expired[i].key, &vl); } /* for (i = 0; i < expired_num; i++) */ /* Now actually remove all the values from the cache. We don't re-evaluate @@ -314,8 +318,6 @@ int uc_check_timeout(void) { int uc_update(const data_set_t *ds, const value_list_t *vl) { char name[6 * DATA_MAX_NAME_LEN]; - cache_entry_t *ce = NULL; - int status; if (FORMAT_VL(name, sizeof(name), vl) != 0) { ERROR("uc_update: FORMAT_VL failed."); @@ -324,11 +326,16 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { pthread_mutex_lock(&cache_lock); - status = c_avl_get(cache_tree, name, (void *)&ce); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); if (status != 0) /* entry does not yet exist */ { status = uc_insert(ds, vl, name); pthread_mutex_unlock(&cache_lock); + + if (status == 0) + plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl); + return status; } @@ -403,11 +410,32 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) { ce->last_update = cdtime(); ce->interval = vl->interval; + /* Check if cache entry has registered callbacks */ + unsigned long callbacks_mask = ce->callbacks_mask; + pthread_mutex_unlock(&cache_lock); + if (callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl); + return 0; } /* int uc_update */ +int uc_set_callbacks_mask(const char *name, unsigned long mask) { + pthread_mutex_lock(&cache_lock); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); + if (status != 0) { /* Ouch, just created entry disappeared ?! */ + ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name); + pthread_mutex_unlock(&cache_lock); + return -1; + } + DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask); + ce->callbacks_mask = mask; + pthread_mutex_unlock(&cache_lock); + return 0; +} + int uc_get_rate_by_name(const char *name, gauge_t **ret_values, size_t *ret_values_num) { gauge_t *ret = NULL; diff --git a/src/daemon/utils_cache.h b/src/daemon/utils_cache.h index d3ea9362..a0692216 100644 --- a/src/daemon/utils_cache.h +++ b/src/daemon/utils_cache.h @@ -56,6 +56,8 @@ int uc_get_hits(const data_set_t *ds, const value_list_t *vl); int uc_set_hits(const data_set_t *ds, const value_list_t *vl, int hits); int uc_inc_hits(const data_set_t *ds, const value_list_t *vl, int step); +int uc_set_callbacks_mask(const char *name, unsigned long callbacks_mask); + int uc_get_history(const data_set_t *ds, const value_list_t *vl, gauge_t *ret_history, size_t num_steps, size_t num_ds); int uc_get_history_by_name(const char *name, gauge_t *ret_history,