int tail;
int maxLen;
char **buffer;
- long long unsigned int *timestamp;
+ cdtime_t *timestamp;
} circbuf_t;
/*
* Private variables
*/
+
static ignorelist_t *ignorelist = NULL;
static int sysevent_socket_thread_loop = 0;
#endif
/*
- * Prototypes
- */
-
-static void sysevent_dispatch_notification(const char *message,
-#if HAVE_YAJL_V2
- yajl_val *node,
-#endif
- long long unsigned int timestamp);
-
-/*
* Private functions
*/
static int gen_message_payload(const char *msg, char *sev, int sev_num,
- char *process, char *host,
- long long unsigned int timestamp, char **buf) {
+ char *process, char *host, cdtime_t timestamp,
+ char **buf) {
const unsigned char *buf2;
yajl_gen g;
char json_str[DATA_MAX_NAME_LEN];
#if !defined(HAVE_YAJL_V2)
- yajl_gen_config conf = {};
-
- conf.beautify = 0;
+ yajl_gen_config conf = {0};
#endif
#if HAVE_YAJL_V2
goto err;
event_id = event_id + 1;
- int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, event_id_len, "%d", event_id);
+ snprintf(json_str, sizeof(json_str), "%d", event_id);
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
goto err;
- int event_name_len = 0;
- event_name_len = event_name_len + strlen(host); // host name
- event_name_len =
- event_name_len +
- 22; // "host", "rsyslog", "message", 3 spaces and null-terminator
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, event_name_len, "host %s rsyslog message", host);
+ snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host);
if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
yajl_gen_status_ok) {
yajl_gen_status_ok)
goto err;
- int last_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, last_epoch_microsec_len, "%llu",
- (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+ snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime()));
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
yajl_gen_status_ok)
goto err;
- int start_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- memset(json_str, '\0', DATA_MAX_NAME_LEN);
- snprintf(json_str, start_epoch_microsec_len, "%llu",
- (long long unsigned int)timestamp);
+ snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp));
if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
goto err;
strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
goto err;
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
- goto err;
-
// *** END syslog fields ***
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+ // close syslog and header fields
+ if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+ yajl_gen_map_close(g) != yajl_gen_status_ok)
goto err;
if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
return -1;
} else {
- // Interrupt, so just return
- return 0;
+ // Interrupt, so continue and try again
+ continue;
}
}
// We successfully received a message, so don't block on the next
// read in case there are more (and if there aren't, it will be
- // handled above in the error-checking)
+ // handled above in the EWOULDBLOCK error-checking)
recv_flags = MSG_DONTWAIT;
// 1. Acquire data lock
DEBUG("sysevent plugin: writing %s", buffer);
strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
- ring.timestamp[ring.head] =
- (long long unsigned int)CDTIME_T_TO_US(cdtime());
+ ring.timestamp[ring.head] = cdtime();
ring.head = next;
}
}
}
+static void sysevent_dispatch_notification(const char *message,
+#if HAVE_YAJL_V2
+ yajl_val *node,
+#endif
+ cdtime_t timestamp) {
+ char *buf = NULL;
+
+ notification_t n = {
+ .severity = NOTIF_OKAY,
+ .time = cdtime(),
+ .plugin = "sysevent",
+ .type = "gauge",
+ };
+
+#if HAVE_YAJL_V2
+ if (node != NULL) {
+ // If we have a parsed-JSON node to work with, use that
+ // msg
+ const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
+ yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
+
+ char msg[listen_buffer_size];
+
+ if (msg_v != NULL) {
+ memset(msg, '\0', listen_buffer_size);
+ snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
+ }
+
+ // severity
+ const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
+ (const char *)0};
+ yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
+
+ char severity[listen_buffer_size];
+
+ if (severity_v != NULL) {
+ memset(severity, '\0', listen_buffer_size);
+ snprintf(severity, listen_buffer_size, "%s%c",
+ YAJL_GET_STRING(severity_v), '\0');
+ }
+
+ // sev_num
+ const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
+ (const char *)0};
+ yajl_val sev_num_str_v =
+ yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
+
+ char sev_num_str[listen_buffer_size];
+ int sev_num = -1;
+
+ if (sev_num_str_v != NULL) {
+ memset(sev_num_str, '\0', listen_buffer_size);
+ snprintf(sev_num_str, listen_buffer_size, "%s%c",
+ YAJL_GET_STRING(sev_num_str_v), '\0');
+
+ sev_num = atoi(sev_num_str);
+
+ if (sev_num < 4)
+ n.severity = NOTIF_FAILURE;
+ }
+
+ // process
+ const char *process_path[] = {"@fields", rsyslog_field_keys[3],
+ (const char *)0};
+ yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
+
+ char process[listen_buffer_size];
+
+ if (process_v != NULL) {
+ memset(process, '\0', listen_buffer_size);
+ snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
+ '\0');
+ }
+
+ // hostname
+ const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
+ yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
+
+ char hostname_str[listen_buffer_size];
+
+ if (hostname_v != NULL) {
+ memset(hostname_str, '\0', listen_buffer_size);
+ snprintf(hostname_str, listen_buffer_size, "%s%c",
+ YAJL_GET_STRING(hostname_v), '\0');
+ }
+
+ gen_message_payload(
+ (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
+ (sev_num_str_v != NULL ? sev_num : -1),
+ (process_v != NULL ? process : NULL),
+ (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
+ } else {
+ // Data was not sent in JSON format, so just treat the whole log entry
+ // as the message (and we'll be unable to acquire certain data, so the
+ // payload
+ // generated below will be less informative)
+
+ gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
+ }
+#else
+ gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
+#endif
+
+ sstrncpy(n.host, hostname_g, sizeof(n.host));
+
+ int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+ if (status < 0) {
+ sfree(buf);
+ ERROR("sysevent plugin: unable to set notification VES metadata: %s",
+ STRERRNO);
+ return;
+ }
+
+ DEBUG("sysevent plugin: notification VES metadata: %s",
+ n.meta->nm_value.nm_string);
+
+ DEBUG("sysevent plugin: dispatching message");
+
+ plugin_dispatch_notification(&n);
+ plugin_notification_meta_free(n.meta);
+
+ // strdup'd in gen_message_payload
+ if (buf != NULL)
+ sfree(buf);
+}
+
static void read_ring_buffer() {
pthread_mutex_lock(&sysevent_data_lock);
DEBUG("sysevent plugin: reading from ring buffer: %s",
ring.buffer[ring.tail]);
- long long unsigned int timestamp = ring.timestamp[ring.tail];
+ cdtime_t timestamp = ring.timestamp[ring.tail];
char *match_str = NULL;
#if HAVE_YAJL_V2
// If we care about matching, do that comparison here
if (match_str != NULL) {
- is_match = 1;
-
if (ignorelist_match(ignorelist, match_str) != 0)
is_match = 0;
else
pthread_mutex_lock(&sysevent_thread_lock);
if (status < 0) {
- WARNING("sysevent plugin: problem with thread status: %d", status);
+ WARNING("sysevent plugin: problem with socket thread (status: %d)",
+ status);
sysevent_socket_thread_error = 1;
break;
}
return status;
} /* }}} int stop_socket_thread */
-static int stop_dequeue_thread(int shutdown) /* {{{ */
+static int stop_dequeue_thread() /* {{{ */
{
pthread_mutex_lock(&sysevent_thread_lock);
pthread_cond_broadcast(&sysevent_cond);
pthread_mutex_unlock(&sysevent_thread_lock);
- int status;
+ // Since the thread is blocking, calling pthread_join
+ // doesn't actually succeed in stopping it. It will stick around
+ // until a message is received on the socket (at which
+ // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
+ // break out of the read loop and be allowed to die). Since this
+ // function is called when the processing is exiting, we don't want to
+ // have an idle thread hanging around. Calling pthread_cancel here
+ // just assures that the thread is gone and that the process has been
+ // fully terminated.
- if (shutdown == 1) {
- // Since the thread is blocking, calling pthread_join
- // doesn't actually succeed in stopping it. It will stick around
- // until a message is received on the socket (at which
- // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
- // break out of the read loop and be allowed to die). This is
- // fine when the process isn't supposed to be exiting, but in
- // the case of a process shutdown, we don't want to have an
- // idle thread hanging around. Calling pthread_cancel here in
- // the case of a shutdown is just assures that the thread is
- // gone and that the process has been fully terminated.
+ DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
- DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
+ int status = pthread_cancel(sysevent_dequeue_thread_id);
- status = pthread_cancel(sysevent_dequeue_thread_id);
-
- if (status != 0 && status != ESRCH) {
- ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
- STRERRNO);
- status = -1;
- } else
- status = 0;
- } else {
- status = pthread_join(sysevent_dequeue_thread_id, /* return = */ NULL);
- if (status != 0 && status != ESRCH) {
- ERROR("sysevent plugin: Stopping dequeue thread failed.");
- status = -1;
- } else
- status = 0;
- }
+ if (status != 0 && status != ESRCH) {
+ ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
+ STRERRNO);
+ status = -1;
+ } else
+ status = 0;
pthread_mutex_lock(&sysevent_thread_lock);
memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
return status;
} /* }}} int stop_dequeue_thread */
-static int stop_threads(int shutdown) /* {{{ */
+static int stop_threads() /* {{{ */
{
- int status = stop_socket_thread(shutdown);
- int status2 = stop_dequeue_thread(shutdown);
+ int status = stop_socket_thread(1);
+ int status2 = stop_dequeue_thread();
if (status != 0)
return status;
ring.buffer[i] = calloc(1, listen_buffer_size);
}
- ring.timestamp = (long long unsigned int *)calloc(
- buffer_length, sizeof(long long unsigned int));
+ ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t));
if (sock == -1) {
- const char *hostname = listen_ip;
- const char *portname = listen_port;
- struct addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_DGRAM;
- hints.ai_protocol = 0;
- hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ struct addrinfo hints = {
+ .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_DGRAM,
+ .ai_protocol = 0,
+ .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
+ };
struct addrinfo *res = 0;
- int err = getaddrinfo(hostname, portname, &hints, &res);
+ int err = getaddrinfo(listen_ip, listen_port, &hints, &res);
if (err != 0) {
ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
freeaddrinfo(res);
+ sock = -1;
return -1;
}
return 0;
} /* }}} int sysevent_config */
-static void sysevent_dispatch_notification(const char *message,
-#if HAVE_YAJL_V2
- yajl_val *node,
-#endif
- long long unsigned int timestamp) {
- char *buf = NULL;
- notification_t n = {NOTIF_OKAY, cdtime(), "", "", "sysevent",
- "", "", "", NULL};
-
-#if HAVE_YAJL_V2
- if (node != NULL) {
- // If we have a parsed-JSON node to work with, use that
-
- // msg
- const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
- yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
-
- char msg[listen_buffer_size];
-
- if (msg_v != NULL) {
- memset(msg, '\0', listen_buffer_size);
- snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
- }
-
- // severity
- const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
- (const char *)0};
- yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
-
- char severity[listen_buffer_size];
-
- if (severity_v != NULL) {
- memset(severity, '\0', listen_buffer_size);
- snprintf(severity, listen_buffer_size, "%s%c",
- YAJL_GET_STRING(severity_v), '\0');
- }
-
- // sev_num
- const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
- (const char *)0};
- yajl_val sev_num_str_v =
- yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
-
- char sev_num_str[listen_buffer_size];
- int sev_num = -1;
-
- if (sev_num_str_v != NULL) {
- memset(sev_num_str, '\0', listen_buffer_size);
- snprintf(sev_num_str, listen_buffer_size, "%s%c",
- YAJL_GET_STRING(sev_num_str_v), '\0');
-
- sev_num = atoi(sev_num_str);
-
- if (sev_num < 4)
- n.severity = NOTIF_FAILURE;
- }
-
- // process
- const char *process_path[] = {"@fields", rsyslog_field_keys[3],
- (const char *)0};
- yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
-
- char process[listen_buffer_size];
-
- if (process_v != NULL) {
- memset(process, '\0', listen_buffer_size);
- snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
- '\0');
- }
-
- // hostname
- const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
- yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
-
- char hostname_str[listen_buffer_size];
-
- if (hostname_v != NULL) {
- memset(hostname_str, '\0', listen_buffer_size);
- snprintf(hostname_str, listen_buffer_size, "%s%c",
- YAJL_GET_STRING(hostname_v), '\0');
- }
-
- gen_message_payload(
- (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
- (sev_num_str_v != NULL ? sev_num : -1),
- (process_v != NULL ? process : NULL),
- (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
- } else {
- // Data was not sent in JSON format, so just treat the whole log entry
- // as the message (and we'll be unable to acquire certain data, so the
- // payload
- // generated below will be less informative)
-
- gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
- }
-#else
- gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
-#endif
-
- sstrncpy(n.host, hostname_g, sizeof(n.host));
- sstrncpy(n.type, "gauge", sizeof(n.type));
-
- notification_meta_t *m = calloc(1, sizeof(*m));
-
- if (m == NULL) {
- sfree(buf);
- ERROR("sysevent plugin: unable to allocate metadata: %s", STRERRNO);
- return;
- }
-
- sstrncpy(m->name, "ves", sizeof(m->name));
- m->nm_value.nm_string = sstrdup(buf);
- m->type = NM_TYPE_STRING;
- n.meta = m;
-
- DEBUG("sysevent plugin: notification message: %s",
- n.meta->nm_value.nm_string);
-
- DEBUG("sysevent plugin: dispatching message");
-
- plugin_dispatch_notification(&n);
- plugin_notification_meta_free(n.meta);
-
- // strdup'd in gen_message_payload
- if (buf != NULL)
- sfree(buf);
-}
-
static int sysevent_read(void) /* {{{ */
{
pthread_mutex_lock(&sysevent_thread_lock);
pthread_mutex_unlock(&sysevent_thread_lock);
ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
- "Restarting "
- "it.",
+ "Restarting it.",
sysevent_socket_thread_error);
stop_threads(0);
if (status2 != 0) {
ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
STRERRNO);
- } else
- sock = -1;
+ }
+
+ sock = -1;
}
free(listen_ip);