VES-enabled sysvent plugin
authorAndrew Bays <abays@redhat.com>
Mon, 26 Feb 2018 20:54:56 +0000 (15:54 -0500)
committerAndrew Bays <abays@redhat.com>
Mon, 26 Feb 2018 20:54:56 +0000 (15:54 -0500)
Makefile.am
configure.ac
src/collectd.conf.pod
src/sysevent.c

index b96ec53..0279510 100644 (file)
@@ -1582,7 +1582,7 @@ pkglib_LTLIBRARIES += sysevent.la
 sysevent_la_SOURCES = src/sysevent.c
 sysevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
 sysevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
-sysevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS)
+sysevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la
 endif
 
 if BUILD_PLUGIN_SYSLOG
index 6035e33..fdfe30a 100644 (file)
@@ -6224,7 +6224,6 @@ if test "x$ac_system" = "xLinux"; then
   if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then
     plugin_ovs_events="yes"
     plugin_ovs_stats="yes"
-    plugin_sysevent="yes"
   fi
 fi
 
@@ -6329,6 +6328,7 @@ fi
 
 if test "x$with_libyajl" = "xyes"; then
   plugin_ceph="yes"
+  plugin_sysevent="yes"
 fi
 
 if test "x$have_processor_info" = "xyes"; then
@@ -6625,7 +6625,7 @@ AC_PLUGIN([snmp_agent],          [$with_libnetsnmpagent],   [SNMP agent plugin])
 AC_PLUGIN([statsd],              [yes],                     [StatsD plugin])
 AC_PLUGIN([swap],                [$plugin_swap],            [Swap usage statistics])
 AC_PLUGIN([synproxy],            [$plugin_synproxy],        [Synproxy stats plugin])
-AC_PLUGIN([sysevent],            [$plugin_sysevent],        [Syslog event statistics])
+AC_PLUGIN([sysevent],            [$plugin_sysevent],        [rsyslog events])
 AC_PLUGIN([syslog],              [$have_syslog],            [Syslog logging plugin])
 AC_PLUGIN([table],               [yes],                     [Parsing of tabular data])
 AC_PLUGIN([tail],                [yes],                     [Parsing of logfiles])
index 5126587..3845fa6 100644 (file)
@@ -7770,35 +7770,55 @@ The I<sysevent> plugin monitors rsyslog messages.
 B<Synopsis:>
  
   <Plugin sysevent>
-    Listen "127.0.0.1" "6666"
+    Listen "192.168.0.2" "6666"
     BufferSize 1024
     BufferLength 10
     RegexFilter "regex"
   </Plugin>
+
+  rsyslog should be configured such that it sends data to the IP and port you
+  include in the plugin configuration.  For example, given the configuration
+  above, something like this would be set in /etc/rsyslog.conf:
+
+    if $programname != 'collectd' then
+    *.* @192.168.0.2:6666
+
+  This plugin is designed to consume JSON rsyslog data, so a more complete
+  rsyslog configuration would look like so (where we define a JSON template
+  and use it when sending data to our IP and port):
+
+    $template ls_json,"{%timestamp:::date-rfc3339,jsonf:@timestamp%, \
+    %source:::jsonf:@source_host%,\"@source\":\"syslog://%fromhost-ip:::json%\", \
+    \"@message\":\"%timestamp% %app-name%:%msg:::json%\",\"@fields\": \
+    {%syslogfacility-text:::jsonf:facility%,%syslogseverity:::jsonf:severity-num%, \
+    %syslogseverity-text:::jsonf:severity%,%programname:::jsonf:program%, \
+    %procid:::jsonf:processid%}}"
+
+    if $programname != 'collectd' then
+    *.* @192.168.0.2:6666;ls_json
+
+  Please note that these rsyslog.conf examples are *not* complete, as rsyslog
+  requires more than these options in the configuration file.  These examples 
+  are meant to demonstration the proper remote logging and JSON format syntax.
+
 B<Options:>
  
 =over 4
  
 =item B<Listen> I<host> I<port>
  
-Listen to this host on this port for incoming rsyslog messages.
+Listen on this IP on this port for incoming rsyslog messages.
 
 =item B<BufferSize> I<length>
  
 Maximum allowed size for incoming rsyslog messages.  Messages that exceed 
-this number will be truncated to this size.  Default is 1024 bytes.
+this number will be truncated to this size.  Default is 4096 bytes.
 
 =item B<BufferLength> I<length>
  
 Maximum number of rsyslog events that can be stored in plugin's ring buffer.
 By default, this is set to 10.  Once an event has been read, its location
 becomes available for storing a new event.
-=item B<Process> I<name>
-Enumerate a process name to monitor.  All processes that match this exact
-name will be monitored for EXECs and EXITs.
 
 =item B<RegexFilter> I<regex>
  
index 7ba2e87..94e99e9 100644 (file)
@@ -29,6 +29,7 @@
 #include "common.h"
 #include "plugin.h"
 #include "utils_complain.h"
+#include "utils_ignorelist.h"
 
 #include <asm/types.h>
 #include <errno.h>
 #include <string.h>
 #include <sys/socket.h>
 #include <unistd.h>
-#include <yajl/yajl_tree.h>
 
-#define SYSEVENT_REGEX_MATCHES 1
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#include <yajl/yajl_tree.h>
+#define HAVE_YAJL_V2 1
+#endif
+
+#define SYSEVENT_DOMAIN_FIELD "domain"
+#define SYSEVENT_DOMAIN_VALUE "syslog"
+#define SYSEVENT_EVENT_ID_FIELD "eventId"
+#define SYSEVENT_EVENT_NAME_FIELD "eventName"
+#define SYSEVENT_EVENT_NAME_VALUE "syslog message"
+#define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define SYSEVENT_PRIORITY_FIELD "priority"
+#define SYSEVENT_PRIORITY_VALUE_HIGH "high"
+#define SYSEVENT_PRIORITY_VALUE_LOW "low"
+#define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
+#define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
+#define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
+#define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
+#define SYSEVENT_SEQUENCE_FIELD "sequence"
+#define SYSEVENT_SEQUENCE_VALUE "0"
+#define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
+#define SYSEVENT_SOURCE_NAME_VALUE "syslog"
+#define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define SYSEVENT_VERSION_FIELD "version"
+#define SYSEVENT_VERSION_VALUE "1.0"
+
+#define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
+#define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
+#define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
+#define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
+#define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
+#define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
+#define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
+#define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
+#define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
+#define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
+#define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
 
 /*
  * Private data types
@@ -53,42 +96,321 @@ typedef struct {
   int tail;
   int maxLen;
   char **buffer;
+  long long unsigned int *timestamp;
 } circbuf_t;
 
-struct regexfilterlist_s {
-  char *regex_filter;
-  regex_t regex_filter_obj;
-
-  struct regexfilterlist_s *next;
-};
-typedef struct regexfilterlist_s regexfilterlist_t;
-
 /*
  * Private variables
  */
+static ignorelist_t *ignorelist = NULL;
 
 static int sysevent_thread_loop = 0;
 static int sysevent_thread_error = 0;
 static pthread_t sysevent_thread_id;
 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
 static int sock = -1;
+static int event_id = 0;
 static circbuf_t ring;
 
 static char *listen_ip;
 static char *listen_port;
-static int listen_buffer_size = 1024;
+static int listen_buffer_size = 4096;
 static int buffer_length = 10;
 
-static regexfilterlist_t *regexfilterlist_head = NULL;
+static int monitor_all_messages = 1;
 
 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
-static const char *rsyslog_field_keys[4] = {"facility", "severity", "program",
-                                            "processid"};
+static const char *rsyslog_field_keys[5] = {
+    "facility", "severity", "severity-num", "program", "processid"};
 
 /*
  * Private functions
  */
 
+static int gen_message_payload(const char *msg, char *sev, int sev_num,
+                               char *process, char *host,
+                               long long unsigned int timestamp, char **buf) {
+  const unsigned char *buf2;
+  yajl_gen g;
+  char json_str[DATA_MAX_NAME_LEN];
+
+#if !defined(HAVE_YAJL_V2)
+  yajl_gen_config conf = {};
+
+  conf.beautify = 0;
+#endif
+
+#if HAVE_YAJL_V2
+  size_t len;
+  g = yajl_gen_alloc(NULL);
+  yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+  unsigned int len;
+  g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+  yajl_gen_clear(g);
+
+  // *** BEGIN common event header ***
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // domain
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
+                      strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
+                      strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventId
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
+                      strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  event_id = event_id + 1;
+  int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, event_id_len, "%d", event_id);
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // eventName
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
+                      strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  int event_name_len = 0;
+  event_name_len = event_name_len + strlen(host); // host name
+  event_name_len =
+      event_name_len +
+      22; // "host", "rsyslog", "message", 3 spaces and null-terminator
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, event_name_len, "host %s rsyslog message", host);
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // lastEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
+                      strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int last_epoch_microsec_len =
+      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, last_epoch_microsec_len, "%llu",
+           (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // priority
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
+                      strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  switch (sev_num) {
+  case 4:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
+                        strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  case 5:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
+                        strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  case 6:
+  case 7:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
+                        strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  default:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
+                        strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  }
+
+  // reportingEntityName
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
+                      strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
+                      strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sequence
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
+                      strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
+                      strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // sourceName
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
+                      strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
+                      strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // startEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
+                      strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int start_epoch_microsec_len =
+      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, start_epoch_microsec_len, "%llu",
+           (long long unsigned int)timestamp);
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // version
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
+                      strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
+                      strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END common event header ***
+
+  // *** BEGIN syslog fields ***
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
+                      strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // eventSourceHost
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
+                      strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventSourceType
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
+                      strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
+                      strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // syslogFieldsVersion
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
+                      strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
+                      strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  // syslogMsg
+  if (msg != NULL) {
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
+                        strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
+        yajl_gen_status_ok)
+      goto err;
+
+    if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
+      goto err;
+  }
+
+  // syslogProc
+  if (process != NULL) {
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
+                        strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
+        yajl_gen_status_ok)
+      goto err;
+
+    if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
+        yajl_gen_status_ok)
+      goto err;
+  }
+
+  // syslogSev
+  if (sev != NULL) {
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
+                        strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
+        yajl_gen_status_ok)
+      goto err;
+
+    if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
+      goto err;
+  }
+
+  // syslogTag
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
+                      strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
+                      strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END syslog fields ***
+
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+    goto err;
+
+  *buf = malloc(strlen((char *)buf2) + 1);
+
+  sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+
+  yajl_gen_free(g);
+
+  return 0;
+
+err:
+  yajl_gen_free(g);
+  ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
+  return -1;
+}
+
 static void *sysevent_thread(void *arg) /* {{{ */
 {
   pthread_mutex_lock(&sysevent_lock);
@@ -131,6 +453,8 @@ static void *sysevent_thread(void *arg) /* {{{ */
         DEBUG("sysevent plugin: writing %s", buffer);
 
         strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
+        ring.timestamp[ring.head] =
+            (long long unsigned int)CDTIME_T_TO_US(cdtime());
         ring.head = next;
       }
 
@@ -153,7 +477,7 @@ static void *sysevent_thread(void *arg) /* {{{ */
 
   pthread_mutex_unlock(&sysevent_lock);
 
-  // pthread_exit instead of return
+  // pthread_exit instead of return?
   return ((void *)0);
 } /* }}} void *sysevent_thread */
 
@@ -251,6 +575,9 @@ static int sysevent_init(void) /* {{{ */
     ring.buffer[i] = malloc(listen_buffer_size);
   }
 
+  ring.timestamp = (long long unsigned int *)malloc(
+      buffer_length * sizeof(long long unsigned int));
+
   if (sock == -1) {
     const char *hostname = listen_ip;
     const char *portname = listen_port;
@@ -331,10 +658,10 @@ static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
 
   if (cf_util_get_int(ci, &tmp) != 0)
     return (-1);
-  else if ((tmp >= 3) && (tmp <= 1024))
+  else if ((tmp >= 3) && (tmp <= 4096))
     buffer_length = tmp;
   else {
-    WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 1024.");
+    WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
     return (-1);
   }
 
@@ -350,34 +677,23 @@ static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
     return (-1);
   }
 
-  regexfilterlist_t *rl;
-  char *regexp_str;
-  regex_t regexp;
-  int status;
-
-  regexp_str = strdup(ci->values[0].value.string);
+#if HAVE_REGEX_H
+  if (ignorelist == NULL)
+    ignorelist = ignorelist_create(/* invert = */ 1);
 
-  status = regcomp(&regexp, regexp_str, REG_EXTENDED);
+  int status = ignorelist_add(ignorelist, ci->values[0].value.string);
 
   if (status != 0) {
-    ERROR("sysevent plugin: 'RegexFilter' invalid regular expression: %s",
-          regexp_str);
-    return (-1);
-  }
-
-  rl = malloc(sizeof(*rl));
-  if (rl == NULL) {
-    char errbuf[1024];
-    ERROR("sysevent plugin: malloc failed during "
-          "sysevent_config_add_regex_filter: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
-    return (-1);
+    ERROR("sysevent plugin: invalid regular expression: %s",
+          ci->values[0].value.string);
+    return (1);
   }
 
-  rl->regex_filter = regexp_str;
-  rl->regex_filter_obj = regexp;
-  rl->next = regexfilterlist_head;
-  regexfilterlist_head = rl;
+  monitor_all_messages = 0;
+#else
+  WARNING("sysevent plugin: The plugin has been compiled without support "
+          "for the \"RegexFilter\" option.");
+#endif
 
   return (0);
 }
@@ -403,66 +719,131 @@ static int sysevent_config(oconfig_item_t *ci) /* {{{ */
   return (0);
 } /* }}} int sysevent_config */
 
-// TODO
-static void submit(const char *message, yajl_val *node,
-                   const char *type, /* {{{ */
-                   gauge_t value) {
-  value_list_t vl = VALUE_LIST_INIT;
-
-  vl.values = &(value_t){.gauge = value};
-  vl.values_len = 1;
-  sstrncpy(vl.plugin, "sysevent", sizeof(vl.plugin));
-  sstrncpy(vl.type, type, sizeof(vl.type));
-
-  // Create metadata to store JSON key-values
-  meta_data_t *meta = meta_data_create();
+static void sysevent_dispatch_notification(const char *message,
+#if HAVE_YAJL_V2
+                                           yajl_val *node,
+#endif
+                                           long long unsigned int timestamp) {
+  char *buf = NULL;
+  notification_t n = {NOTIF_OKAY, cdtime(), "", "",  "sysevent",
+                      "",         "",       "", NULL};
 
+#if HAVE_YAJL_V2
   if (node != NULL) {
     // If we have a parsed-JSON node to work with, use that
-    size_t i = 0;
 
-    for (i = 0; i < sizeof(rsyslog_keys) / sizeof(*rsyslog_keys); i++) {
-      char json_val[listen_buffer_size];
-      const char *key = (const char *)rsyslog_keys[i];
-      const char *path[] = {key, (const char *)0};
-      yajl_val v = yajl_tree_get(*node, path, yajl_t_string);
+    char process[listen_buffer_size];
+    char severity[listen_buffer_size];
+    char sev_num_str[listen_buffer_size];
+    char msg[listen_buffer_size];
+    char hostname_str[listen_buffer_size];
+    int sev_num = -1;
 
-      memset(json_val, '\0', listen_buffer_size);
+    // msg
+    const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
+    yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
 
-      snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v), '\0');
+    if (msg_v != NULL) {
+      memset(msg, '\0', listen_buffer_size);
+      snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
+    }
 
-      DEBUG("sysevent plugin: adding jsonval: %s", json_val);
+    // severity
+    const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
+                                   (const char *)0};
+    yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
 
-      meta_data_add_string(meta, rsyslog_keys[i], json_val);
+    if (severity_v != NULL) {
+      memset(severity, '\0', listen_buffer_size);
+      snprintf(severity, listen_buffer_size, "%s%c",
+               YAJL_GET_STRING(severity_v), '\0');
     }
 
-    for (i = 0; i < sizeof(rsyslog_field_keys) / sizeof(*rsyslog_field_keys);
-         i++) {
-      char json_val[listen_buffer_size];
-      const char *key = (const char *)rsyslog_field_keys[i];
-      const char *path[] = {"@fields", key, (const char *)0};
-      yajl_val v = yajl_tree_get(*node, path, yajl_t_string);
+    // sev_num
+    const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
+                                      (const char *)0};
+    yajl_val sev_num_str_v =
+        yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
 
-      memset(json_val, '\0', listen_buffer_size);
+    if (sev_num_str_v != NULL) {
+      memset(sev_num_str, '\0', listen_buffer_size);
+      snprintf(sev_num_str, listen_buffer_size, "%s%c",
+               YAJL_GET_STRING(sev_num_str_v), '\0');
 
-      snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v), '\0');
+      sev_num = atoi(sev_num_str);
 
-      DEBUG("sysevent plugin: adding jsonval: %s", json_val);
+      if (sev_num < 4)
+        n.severity = NOTIF_FAILURE;
+    }
+
+    // process
+    const char *process_path[] = {"@fields", rsyslog_field_keys[3],
+                                  (const char *)0};
+    yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
 
-      meta_data_add_string(meta, rsyslog_field_keys[i], json_val);
+    if (process_v != NULL) {
+      memset(process, '\0', listen_buffer_size);
+      snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
+               '\0');
     }
+
+    // hostname
+    const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
+    yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
+
+    if (hostname_v != NULL) {
+      memset(hostname_str, '\0', listen_buffer_size);
+      snprintf(hostname_str, listen_buffer_size, "%s%c",
+               YAJL_GET_STRING(hostname_v), '\0');
+    }
+
+    gen_message_payload(
+        (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
+        (sev_num_str_v != NULL ? sev_num : -1),
+        (process_v != NULL ? process : NULL),
+        (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
   } else {
     // Data was not sent in JSON format, so just treat the whole log entry
-    // as the message
-    meta_data_add_string(meta, "@message", strdup(message));
+    // as the message (and we'll be unable to acquire certain data, so the
+    // payload
+    // generated below will be less informative)
+
+    gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
+  }
+#else
+  gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
+#endif
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.type, "gauge", sizeof(n.type));
+
+  notification_meta_t *m = calloc(1, sizeof(*m));
+
+  if (m == NULL) {
+    char errbuf[1024];
+    sfree(buf);
+    ERROR("sysevent plugin: unable to allocate metadata: %s",
+          sstrerror(errno, errbuf, sizeof(errbuf)));
+    return;
   }
 
-  vl.meta = meta;
+  sstrncpy(m->name, "ves", sizeof(m->name));
+  m->nm_value.nm_string = sstrdup(buf);
+  m->type = NM_TYPE_STRING;
+  n.meta = m;
+
+  DEBUG("sysevent plugin: notification message: %s",
+        n.meta->nm_value.nm_string);
 
   DEBUG("sysevent plugin: dispatching message");
 
-  plugin_dispatch_values(&vl);
-} /* }}} void sysevent_submit */
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // malloc'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
 
 static int sysevent_read(void) /* {{{ */
 {
@@ -481,22 +862,24 @@ static int sysevent_read(void) /* {{{ */
   pthread_mutex_lock(&sysevent_lock);
 
   while (ring.head != ring.tail) {
+    long long unsigned int timestamp;
     int is_match = 1;
     char *match_str = NULL;
-    regexfilterlist_t *rl = regexfilterlist_head;
     int next = ring.tail + 1;
 
     if (next >= ring.maxLen)
       next = 0;
 
-    DEBUG("sysevent plugin: reading %s", ring.buffer[ring.tail]);
+    DEBUG("sysevent plugin: reading from ring buffer: %s",
+          ring.buffer[ring.tail]);
+
+    timestamp = ring.timestamp[ring.tail];
 
+#if HAVE_YAJL_V2
     // Try to parse JSON, and if it fails, fall back to plain string
-    yajl_val node;
+    yajl_val node = NULL;
     char errbuf[1024];
-
     errbuf[0] = 0;
-
     node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
                            sizeof(errbuf));
 
@@ -505,7 +888,7 @@ static int sysevent_read(void) /* {{{ */
 
       // If we have any regex filters, we need to see if the message portion of
       // the data matches any of them (otherwise we're not interested)
-      if (regexfilterlist_head != NULL) {
+      if (monitor_all_messages == 0) {
         char json_val[listen_buffer_size];
         const char *path[] = {"@message", (const char *)0};
         yajl_val v = yajl_tree_get(node, path, yajl_t_string);
@@ -522,38 +905,36 @@ static int sysevent_read(void) /* {{{ */
 
       // If we have any regex filters, we need to see if the message data
       // matches any of them (otherwise we're not interested)
-      if (regexfilterlist_head != NULL)
+      if (monitor_all_messages == 0)
         match_str = ring.buffer[ring.tail];
     }
+#else
+    // If we have any regex filters, we need to see if the message data
+    // matches any of them (otherwise we're not interested)
+    if (monitor_all_messages == 0)
+      match_str = ring.buffer[ring.tail];
+#endif
 
     // If we care about matching, do that comparison here
     if (match_str != NULL) {
-      is_match = 0;
+      is_match = 1;
 
-      while (rl != NULL) {
-        regmatch_t matches[SYSEVENT_REGEX_MATCHES];
-
-        is_match = (regexec(&rl->regex_filter_obj, match_str,
-                            SYSEVENT_REGEX_MATCHES, matches, 0) == 0
-                        ? 1
-                        : 0);
-
-        if (is_match == 1) {
-          DEBUG("sysevent plugin: regex filter match: %s", rl->regex_filter);
-          break;
-        }
-
-        rl = rl->next;
-      }
+      if (ignorelist_match(ignorelist, match_str) != 0)
+        is_match = 0;
+      else
+        DEBUG("sysevent plugin: regex filter match");
     }
 
-    if (is_match == 1 && node != NULL)
-      submit(NULL, &node, "gauge", 1);
-    else if (is_match == 1)
-      submit(ring.buffer[ring.tail], NULL, "gauge", 1);
-
-    if (node != NULL)
+#if HAVE_YAJL_V2
+    if (is_match == 1 && node != NULL) {
+      sysevent_dispatch_notification(NULL, &node, timestamp);
       yajl_tree_free(node);
+    } else if (is_match == 1)
+      sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
+#else
+    if (is_match == 1)
+      sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
+#endif
 
     ring.tail = next;
   }
@@ -566,7 +947,6 @@ static int sysevent_read(void) /* {{{ */
 static int sysevent_shutdown(void) /* {{{ */
 {
   int status;
-  regexfilterlist_t *rl;
 
   DEBUG("sysevent plugin: Shutting down thread.");
   if (stop_thread(1) < 0)
@@ -590,20 +970,7 @@ static int sysevent_shutdown(void) /* {{{ */
   }
 
   free(ring.buffer);
-
-  rl = regexfilterlist_head;
-  while (rl != NULL) {
-    regexfilterlist_t *rl_next;
-
-    rl_next = rl->next;
-
-    free(rl->regex_filter);
-    regfree(&rl->regex_filter_obj);
-
-    sfree(rl);
-
-    rl = rl_next;
-  }
+  free(ring.timestamp);
 
   return (0);
 } /* }}} int sysevent_shutdown */
@@ -613,4 +980,4 @@ void module_register(void) {
   plugin_register_init("sysevent", sysevent_init);
   plugin_register_read("sysevent", sysevent_read);
   plugin_register_shutdown("sysevent", sysevent_shutdown);
-} /* void module_register */
\ No newline at end of file
+} /* void module_register */