From: Andrew Bays Date: Wed, 13 Dec 2017 14:05:30 +0000 (-0500) Subject: sysevent plugin initial commit X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=6021d68d4156d460bfd0e126cbd74cd10e0c9c25;p=collectd.git sysevent plugin initial commit --- diff --git a/AUTHORS b/AUTHORS index 4df743c5..d9e075f7 100644 --- a/AUTHORS +++ b/AUTHORS @@ -56,6 +56,9 @@ Amit Gupta Andreas Henriksson - libmnl support in the netlink plugin. +Andrew Bays + - sysevent plugin. + Andy Parkins - battery plugin: sysfs code. diff --git a/Makefile.am b/Makefile.am index e37716f6..b96ec53a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1577,6 +1577,14 @@ synproxy_la_SOURCES = src/synproxy.c synproxy_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif +if BUILD_PLUGIN_SYSEVENT +pkglib_LTLIBRARIES += sysevent.la +sysevent_la_SOURCES = src/sysevent.c +sysevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS) +sysevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS) +sysevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) +endif + if BUILD_PLUGIN_SYSLOG pkglib_LTLIBRARIES += syslog.la syslog_la_SOURCES = src/syslog.c diff --git a/README b/README index ca86c84d..eeee04a1 100644 --- a/README +++ b/README @@ -378,6 +378,9 @@ Features Acts as a StatsD server, reading values sent over the network from StatsD clients and calculating rates and other aggregates out of these values. + - sysevent + Listens to rsyslog events and submits matched values. + - swap Pages swapped out onto hard disk or whatever is called `swap' by the OS.. diff --git a/configure.ac b/configure.ac index 352a7b66..017a621b 100644 --- a/configure.ac +++ b/configure.ac @@ -6155,6 +6155,7 @@ plugin_serial="no" plugin_smart="no" plugin_swap="no" plugin_synproxy="no" +plugin_sysevent="no" plugin_tape="no" plugin_tcpconns="no" plugin_ted="no" @@ -6327,6 +6328,7 @@ fi if test "x$with_libyajl" = "xyes"; then plugin_ceph="yes" + plugin_sysevent="yes" fi if test "x$have_processor_info" = "xyes"; then @@ -6623,6 +6625,7 @@ AC_PLUGIN([snmp_agent], [$with_libnetsnmpagent], [SNMP agent plugin]) AC_PLUGIN([statsd], [yes], [StatsD plugin]) AC_PLUGIN([swap], [$plugin_swap], [Swap usage statistics]) AC_PLUGIN([synproxy], [$plugin_synproxy], [Synproxy stats plugin]) +AC_PLUGIN([sysevent], [$with_libyajl], [Syslog event statistics]) AC_PLUGIN([syslog], [$have_syslog], [Syslog logging plugin]) AC_PLUGIN([table], [yes], [Parsing of tabular data]) AC_PLUGIN([tail], [yes], [Parsing of logfiles]) @@ -7041,6 +7044,7 @@ AC_MSG_RESULT([ snmp_agent . . . . . $enable_snmp_agent]) AC_MSG_RESULT([ statsd . . . . . . . $enable_statsd]) AC_MSG_RESULT([ swap . . . . . . . . $enable_swap]) AC_MSG_RESULT([ synproxy . . . . . . $enable_synproxy]) +AC_MSG_RESULT([ sysevent. . . . . . . $enable_sysevent]) AC_MSG_RESULT([ syslog . . . . . . . $enable_syslog]) AC_MSG_RESULT([ table . . . . . . . . $enable_table]) AC_MSG_RESULT([ tail_csv . . . . . . $enable_tail_csv]) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 6ec61f32..08d204c8 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -192,6 +192,7 @@ #@BUILD_PLUGIN_SNMP_AGENT_TRUE@LoadPlugin snmp_agent #@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd #@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap +#@BUILD_PLUGIN_SYSEVENT_TRUE@LoadPlugin sysevent #@BUILD_PLUGIN_TABLE_TRUE@LoadPlugin table #@BUILD_PLUGIN_TAIL_TRUE@LoadPlugin tail #@BUILD_PLUGIN_TAIL_CSV_TRUE@LoadPlugin tail_csv @@ -1391,6 +1392,13 @@ # ReportIO true # +# +# Listen "127.0.0.1" "6666" +# BufferSize 1024 +# BufferLength 10 +# RegexFilter "regex" +# + # # # #Plugin "table" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index dfd785a2..5126587f 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7763,6 +7763,50 @@ or is not reliable. =back +=head2 Plugin C + +The I plugin monitors rsyslog messages. + +B + + + Listen "127.0.0.1" "6666" + BufferSize 1024 + BufferLength 10 + RegexFilter "regex" + + +B + +=over 4 + +=item B I I + +Listen to this host on this port for incoming rsyslog messages. + +=item B I + +Maximum allowed size for incoming rsyslog messages. Messages that exceed +this number will be truncated to this size. Default is 1024 bytes. + +=item B I + +Maximum number of rsyslog events that can be stored in plugin's ring buffer. +By default, this is set to 10. Once an event has been read, its location +becomes available for storing a new event. + +=item B I + +Enumerate a process name to monitor. All processes that match this exact +name will be monitored for EXECs and EXITs. + +=item B I + +Enumerate a regex filter to apply to all incoming rsyslog messages. If a +message matches this filter, it will be published. + +=back + =head2 Plugin C =over 4 diff --git a/src/sysevent.c b/src/sysevent.c new file mode 100644 index 00000000..4da0fd76 --- /dev/null +++ b/src/sysevent.c @@ -0,0 +1,615 @@ +/** + * collectd - src/sysevent.c + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Red Hat NFVPE + * Andrew Bays + **/ + +#include "collectd.h" + +#include "common.h" +#include "plugin.h" +#include "utils_complain.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SYSEVENT_REGEX_MATCHES 1 + +/* + * Private data types + */ + +typedef struct { + int head; + int tail; + int maxLen; + char **buffer; +} circbuf_t; + +struct regexfilterlist_s { + char *regex_filter; + regex_t regex_filter_obj; + + struct regexfilterlist_s *next; +}; +typedef struct regexfilterlist_s regexfilterlist_t; + +/* + * Private variables + */ + +static int sysevent_thread_loop = 0; +static int sysevent_thread_error = 0; +static pthread_t sysevent_thread_id; +static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER; +static int sock = -1; +static circbuf_t ring; + +static char *listen_ip; +static char *listen_port; +static int listen_buffer_size = 1024; +static int buffer_length = 10; + +static regexfilterlist_t *regexfilterlist_head = NULL; + +static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"}; +static const char *rsyslog_field_keys[4] = {"facility", "severity", "program", + "processid"}; + +/* + * Private functions + */ + +static void *sysevent_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&sysevent_lock); + + while (sysevent_thread_loop > 0) { + int status = 0; + + pthread_mutex_unlock(&sysevent_lock); + + if (sock == -1) + return ((void *)0); + + char buffer[listen_buffer_size]; + struct sockaddr_storage src_addr; + socklen_t src_addr_len = sizeof(src_addr); + + memset(buffer, '\0', listen_buffer_size); + + ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0, + (struct sockaddr *)&src_addr, &src_addr_len); + + if (count == -1) { + ERROR("sysevent plugin: failed to receive data: %s", strerror(errno)); + status = -1; + } else if (count >= sizeof(buffer)) { + WARNING("sysevent plugin: datagram too large for buffer: truncated"); + } else { + // 1. Acquire lock + // 2. Push to buffer if there is room, otherwise raise warning + + pthread_mutex_lock(&sysevent_lock); + + int next = ring.head + 1; + if (next >= ring.maxLen) + next = 0; + + if (next == ring.tail) { + WARNING("sysevent plugin: ring buffer full"); + } else { + DEBUG("sysevent plugin: writing %s", buffer); + + strncpy(ring.buffer[ring.head], buffer, sizeof(buffer)); + ring.head = next; + } + + pthread_mutex_unlock(&sysevent_lock); + } + + usleep(1000); + + pthread_mutex_lock(&sysevent_lock); + + if (status < 0) { + WARNING("sysevent plugin: problem with thread status: %d", status); + sysevent_thread_error = 1; + break; + } + + if (sysevent_thread_loop <= 0) + break; + } /* while (sysevent_thread_loop > 0) */ + + pthread_mutex_unlock(&sysevent_lock); + + // pthread_exit instead of return + return ((void *)0); +} /* }}} void *sysevent_thread */ + +static int start_thread(void) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&sysevent_lock); + + if (sysevent_thread_loop != 0) { + pthread_mutex_unlock(&sysevent_lock); + return (0); + } + + sysevent_thread_loop = 1; + sysevent_thread_error = 0; + + DEBUG("sysevent plugin: starting thread"); + + status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL, + sysevent_thread, + /* arg = */ (void *)0, "sysevent"); + if (status != 0) { + sysevent_thread_loop = 0; + ERROR("sysevent plugin: starting thread failed."); + pthread_mutex_unlock(&sysevent_lock); + return (-1); + } + + pthread_mutex_unlock(&sysevent_lock); + return (0); +} /* }}} int start_thread */ + +static int stop_thread(int shutdown) /* {{{ */ +{ + int status; + + pthread_mutex_lock(&sysevent_lock); + + if (sysevent_thread_loop == 0) { + pthread_mutex_unlock(&sysevent_lock); + return (-1); + } + + sysevent_thread_loop = 0; + pthread_mutex_unlock(&sysevent_lock); + + if (shutdown == 1) { + // Since the thread is blocking, calling pthread_join + // doesn't actually succeed in stopping it. It will stick around + // until a message is received on the socket (at which + // it will realize that "sysevent_thread_loop" is 0 and will + // break out of the read loop and be allowed to die). This is + // fine when the process isn't supposed to be exiting, but in + // the case of a process shutdown, we don't want to have an + // idle thread hanging around. Calling pthread_cancel here in + // the case of a shutdown is just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("sysevent plugin: Canceling thread for process shutdown"); + + status = pthread_cancel(sysevent_thread_id); + + if (status != 0) { + ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status, + strerror(errno)); + status = -1; + } + } else { + status = pthread_join(sysevent_thread_id, /* return = */ NULL); + if (status != 0) { + ERROR("sysevent plugin: Stopping thread failed."); + status = -1; + } + } + + pthread_mutex_lock(&sysevent_lock); + memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id)); + sysevent_thread_error = 0; + pthread_mutex_unlock(&sysevent_lock); + + DEBUG("sysevent plugin: Finished requesting stop of thread"); + + return (status); +} /* }}} int stop_thread */ + +static int sysevent_init(void) /* {{{ */ +{ + ring.head = 0; + ring.tail = 0; + ring.maxLen = buffer_length; + ring.buffer = (char **)malloc(buffer_length * sizeof(char *)); + + for (int i = 0; i < buffer_length; i++) { + ring.buffer[i] = malloc(listen_buffer_size); + } + + if (sock == -1) { + const char *hostname = listen_ip; + const char *portname = listen_port; + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + struct addrinfo *res = 0; + + int err = getaddrinfo(hostname, portname, &hints, &res); + + if (err != 0) { + ERROR("sysevent plugin: failed to resolve local socket address (err=%d)", + err); + freeaddrinfo(res); + return (-1); + } + + sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sock == -1) { + ERROR("sysevent plugin: failed to open socket: %s", strerror(errno)); + freeaddrinfo(res); + return (-1); + } + + if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) { + ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno)); + freeaddrinfo(res); + return (-1); + } + + freeaddrinfo(res); + } + + DEBUG("sysevent plugin: socket created and bound"); + + return (start_thread()); +} /* }}} int sysevent_init */ + +static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */ +{ + if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING || + ci->values[1].type != OCONFIG_TYPE_STRING) { + ERROR("sysevent plugin: The `%s' config option needs " + "two string arguments (ip and port).", + ci->key); + return (-1); + } + + listen_ip = strdup(ci->values[0].value.string); + listen_port = strdup(ci->values[1].value.string); + + return (0); +} + +static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */ +{ + int tmp = 0; + + if (cf_util_get_int(ci, &tmp) != 0) + return (-1); + else if ((tmp >= 1024) && (tmp <= 65535)) + listen_buffer_size = tmp; + else { + WARNING( + "sysevent plugin: The `BufferSize' must be between 1024 and 65535."); + return (-1); + } + + return (0); +} + +static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */ +{ + int tmp = 0; + + if (cf_util_get_int(ci, &tmp) != 0) + return (-1); + else if ((tmp >= 3) && (tmp <= 1024)) + buffer_length = tmp; + else { + WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 1024."); + return (-1); + } + + return (0); +} + +static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */ +{ + if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) { + ERROR("sysevent plugin: The `%s' config option needs " + "one string argument, a regular expression.", + ci->key); + return (-1); + } + + regexfilterlist_t *rl; + char *regexp_str; + regex_t regexp; + int status; + + regexp_str = strdup(ci->values[0].value.string); + + status = regcomp(®exp, regexp_str, REG_EXTENDED); + + if (status != 0) { + ERROR("sysevent plugin: 'RegexFilter' invalid regular expression: %s", + regexp_str); + return (-1); + } + + rl = malloc(sizeof(*rl)); + if (rl == NULL) { + char errbuf[1024]; + ERROR("sysevent plugin: malloc failed during " + "sysevent_config_add_regex_filter: %s", + sstrerror(errno, errbuf, sizeof(errbuf))); + return (-1); + } + + rl->regex_filter = regexp_str; + rl->regex_filter_obj = regexp; + rl->next = regexfilterlist_head; + regexfilterlist_head = rl; + + return (0); +} + +static int sysevent_config(oconfig_item_t *ci) /* {{{ */ +{ + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Listen", child->key) == 0) + sysevent_config_add_listen(child); + else if (strcasecmp("BufferSize", child->key) == 0) + sysevent_config_add_buffer_size(child); + else if (strcasecmp("BufferLength", child->key) == 0) + sysevent_config_add_buffer_length(child); + else if (strcasecmp("RegexFilter", child->key) == 0) + sysevent_config_add_regex_filter(child); + else { + WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key); + } + } + + return (0); +} /* }}} int sysevent_config */ + +// TODO +static void submit(const char *message, yajl_val *node, + const char *type, /* {{{ */ + gauge_t value) { + value_list_t vl = VALUE_LIST_INIT; + + vl.values = &(value_t){.gauge = value}; + vl.values_len = 1; + sstrncpy(vl.plugin, "sysevent", sizeof(vl.plugin)); + sstrncpy(vl.type, type, sizeof(vl.type)); + + // Create metadata to store JSON key-values + meta_data_t *meta = meta_data_create(); + + if (node != NULL) { + // If we have a parsed-JSON node to work with, use that + size_t i = 0; + + for (i = 0; i < sizeof(rsyslog_keys) / sizeof(*rsyslog_keys); i++) { + char json_val[listen_buffer_size]; + const char *key = (const char *)rsyslog_keys[i]; + const char *path[] = {key, (const char *)0}; + yajl_val v = yajl_tree_get(*node, path, yajl_t_string); + + memset(json_val, '\0', listen_buffer_size); + + sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0'); + + DEBUG("sysevent plugin: adding jsonval: %s", json_val); + + meta_data_add_string(meta, rsyslog_keys[i], json_val); + } + + for (i = 0; i < sizeof(rsyslog_field_keys) / sizeof(*rsyslog_field_keys); + i++) { + char json_val[listen_buffer_size]; + const char *key = (const char *)rsyslog_field_keys[i]; + const char *path[] = {"@fields", key, (const char *)0}; + yajl_val v = yajl_tree_get(*node, path, yajl_t_string); + + memset(json_val, '\0', listen_buffer_size); + + sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0'); + + DEBUG("sysevent plugin: adding jsonval: %s", json_val); + + meta_data_add_string(meta, rsyslog_field_keys[i], json_val); + } + } else { + // Data was not sent in JSON format, so just treat the whole log entry + // as the message + meta_data_add_string(meta, "@message", strdup(message)); + } + + vl.meta = meta; + + DEBUG("sysevent plugin: dispatching message"); + + plugin_dispatch_values(&vl); +} /* }}} void sysevent_submit */ + +static int sysevent_read(void) /* {{{ */ +{ + if (sysevent_thread_error != 0) { + ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting " + "it.", + sysevent_thread_error); + + stop_thread(0); + + start_thread(); + + return (-1); + } /* if (sysevent_thread_error != 0) */ + + pthread_mutex_lock(&sysevent_lock); + + while (ring.head != ring.tail) { + int is_match = 1; + char *match_str = NULL; + regexfilterlist_t *rl = regexfilterlist_head; + int next = ring.tail + 1; + + if (next >= ring.maxLen) + next = 0; + + DEBUG("sysevent plugin: reading %s", ring.buffer[ring.tail]); + + // Try to parse JSON, and if it fails, fall back to plain string + yajl_val node; + char errbuf[1024]; + + errbuf[0] = 0; + + node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf, + sizeof(errbuf)); + + if (node != NULL) { + // JSON rsyslog data + + // If we have any regex filters, we need to see if the message portion of + // the data matches any of them (otherwise we're not interested) + if (regexfilterlist_head != NULL) { + char json_val[listen_buffer_size]; + const char *path[] = {"@message", (const char *)0}; + yajl_val v = yajl_tree_get(node, path, yajl_t_string); + + memset(json_val, '\0', listen_buffer_size); + + sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0'); + + match_str = (char *)&json_val; + } + } else { + // non-JSON rsyslog data + + // If we have any regex filters, we need to see if the message data + // matches any of them (otherwise we're not interested) + if (regexfilterlist_head != NULL) + match_str = ring.buffer[ring.tail]; + } + + // If we care about matching, do that comparison here + if (match_str != NULL) { + is_match = 0; + + while (rl != NULL) { + regmatch_t matches[SYSEVENT_REGEX_MATCHES]; + + is_match = (regexec(&rl->regex_filter_obj, match_str, + SYSEVENT_REGEX_MATCHES, matches, 0) == 0 + ? 1 + : 0); + + if (is_match == 1) { + DEBUG("sysevent plugin: regex filter match: %s", rl->regex_filter); + break; + } + + rl = rl->next; + } + } + + if (is_match == 1 && node != NULL) + submit(NULL, &node, "gauge", 1); + else if (is_match == 1) + submit(ring.buffer[ring.tail], NULL, "gauge", 1); + + if (node != NULL) + yajl_tree_free(node); + + ring.tail = next; + } + + pthread_mutex_unlock(&sysevent_lock); + + return (0); +} /* }}} int sysevent_read */ + +static int sysevent_shutdown(void) /* {{{ */ +{ + int status; + regexfilterlist_t *rl; + + DEBUG("sysevent plugin: Shutting down thread."); + if (stop_thread(1) < 0) + return (-1); + + if (sock != -1) { + status = close(sock); + if (status != 0) { + ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status, + strerror(errno)); + return (-1); + } else + sock = -1; + } + + free(listen_ip); + free(listen_port); + + for (int i = 0; i < buffer_length; i++) { + free(ring.buffer[i]); + } + + free(ring.buffer); + + rl = regexfilterlist_head; + while (rl != NULL) { + regexfilterlist_t *rl_next; + + rl_next = rl->next; + + free(rl->regex_filter); + regfree(&rl->regex_filter_obj); + + sfree(rl); + + rl = rl_next; + } + + return (0); +} /* }}} int sysevent_shutdown */ + +void module_register(void) { + plugin_register_complex_config("sysevent", sysevent_config); + plugin_register_init("sysevent", sysevent_init); + plugin_register_read("sysevent", sysevent_read); + plugin_register_shutdown("sysevent", sysevent_shutdown); +} /* void module_register */ \ No newline at end of file diff --git a/src/types.db b/src/types.db index f27ba465..a48f073f 100644 --- a/src/types.db +++ b/src/types.db @@ -235,6 +235,7 @@ spam_score value:GAUGE:U:U spl value:GAUGE:U:U swap value:GAUGE:0:1099511627776 swap_io value:DERIVE:0:U +sysevent value:GAUGE:0:1 tcp_connections value:GAUGE:0:4294967295 temperature value:GAUGE:U:U threads value:GAUGE:0:U