#include <linux/netlink.h>
#include <linux/rtnetlink.h>
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#define HAVE_YAJL_V2 1
+#endif
+
#define MYPROTO NETLINK_ROUTE
+#define CONNECTIVITY_DOMAIN_FIELD "domain"
+#define CONNECTIVITY_DOMAIN_VALUE "stateChange"
+#define CONNECTIVITY_EVENT_ID_FIELD "eventId"
+#define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
+#define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
+#define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
+#define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define CONNECTIVITY_PRIORITY_FIELD "priority"
+#define CONNECTIVITY_PRIORITY_VALUE "high"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
+#define CONNECTIVITY_SEQUENCE_FIELD "sequence"
+#define CONNECTIVITY_SEQUENCE_VALUE "0"
+#define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
+#define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define CONNECTIVITY_VERSION_FIELD "version"
+#define CONNECTIVITY_VERSION_VALUE "1.0"
+
+#define CONNECTIVITY_NEW_STATE_FIELD "newState"
+#define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_OLD_STATE_FIELD "oldState"
+#define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD \
+ "stateChangeFieldsVersion"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
+#define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
+
/*
* Private data types
*/
uint32_t status;
uint32_t prev_status;
uint32_t sent;
- uint32_t sec;
- uint32_t usec;
+ long long unsigned int timestamp;
struct interfacelist_s *next;
};
static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
static struct mnl_socket *sock;
+static int event_id = 0;
static const char *config_keys[] = {"Interface"};
static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
* Private functions
*/
+static int gen_message_payload(int state, int old_state, const char *interface,
+ long long unsigned int timestamp, char **buf) {
+ const unsigned char *buf2;
+ yajl_gen g;
+
+#if !defined(HAVE_YAJL_V2)
+ yajl_gen_config conf = {};
+
+ conf.beautify = 0;
+#endif
+
+#if HAVE_YAJL_V2
+ size_t len;
+ g = yajl_gen_alloc(NULL);
+ yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+ unsigned int len;
+ g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+ yajl_gen_clear(g);
+
+ // *** BEGIN common event header ***
+
+ if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+ goto err;
+
+ // domain
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
+ strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
+ strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
+ goto err;
+
+ // eventId
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
+ strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ event_id = event_id + 1;
+ int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
+ char *event_id_str = malloc(event_id_len);
+ snprintf(event_id_str, event_id_len, "%d", event_id);
+
+ if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
+ yajl_gen_status_ok) {
+ sfree(event_id_str);
+ goto err;
+ }
+
+ sfree(event_id_str);
+
+ // eventName
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
+ strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ int event_name_len = 0;
+ event_name_len = event_name_len + strlen(interface); // interface name
+ event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
+ event_name_len =
+ event_name_len + 12; // "interface", 2 spaces and null-terminator
+ char *event_name_str = malloc(event_name_len);
+ memset(event_name_str, '\0', event_name_len);
+ snprintf(event_name_str, event_name_len, "interface %s %s", interface,
+ (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
+ : CONNECTIVITY_EVENT_NAME_UP_VALUE));
+
+ if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
+ yajl_gen_status_ok) {
+ sfree(event_name_str);
+ goto err;
+ }
+
+ sfree(event_name_str);
+
+ // lastEpochMicrosec
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
+ strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ int last_epoch_microsec_len =
+ sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+ char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
+ snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
+ (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+
+ if (yajl_gen_number(g, last_epoch_microsec_str,
+ strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
+ sfree(last_epoch_microsec_str);
+ goto err;
+ }
+
+ sfree(last_epoch_microsec_str);
+
+ // priority
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
+ strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
+ strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ // reportingEntityName
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
+ strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
+ strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ // sequence
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
+ strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
+ strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ // sourceName
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
+ strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ // startEpochMicrosec
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
+ strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ int start_epoch_microsec_len =
+ sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+ char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
+ snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
+ (long long unsigned int)timestamp);
+
+ if (yajl_gen_number(g, start_epoch_microsec_str,
+ strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
+ sfree(start_epoch_microsec_str);
+ goto err;
+ }
+
+ sfree(start_epoch_microsec_str);
+
+ // version
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
+ strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
+ strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
+ goto err;
+
+ // *** END common event header ***
+
+ // *** BEGIN state change fields ***
+
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
+ strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+ goto err;
+
+ // newState
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
+ strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ int new_state_len =
+ (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
+ : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
+
+ if (yajl_gen_string(
+ g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
+ : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
+ new_state_len) != yajl_gen_status_ok)
+ goto err;
+
+ // oldState
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
+ strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ int old_state_len =
+ (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
+ : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
+
+ if (yajl_gen_string(
+ g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
+ : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
+ old_state_len) != yajl_gen_status_ok)
+ goto err;
+
+ // stateChangeFieldsVersion
+ if (yajl_gen_string(g,
+ (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
+ strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
+ strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ // stateInterface
+ if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
+ strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+ yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+ goto err;
+
+ // *** END state change fields ***
+
+ if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+ goto err;
+
+ if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+ goto err;
+
+ *buf = malloc(strlen((char *)buf2) + 1);
+
+ sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+
+ yajl_gen_free(g);
+
+ return 0;
+
+err:
+ yajl_gen_free(g);
+ ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
+ return -1;
+}
+
static int connectivity_link_state(struct nlmsghdr *msg) {
int retval = 0;
struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
break;
if (il == NULL) {
- INFO("connectivity plugin: Ignoring link state change for unmonitored "
- "interface: %s",
- dev);
+ DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
+ "interface: %s",
+ dev);
} else {
uint32_t prev_status;
- struct timeval tv;
- gettimeofday(&tv, NULL);
-
- unsigned long long millisecondsSinceEpoch =
- (unsigned long long)(tv.tv_sec) * 1000 +
- (unsigned long long)(tv.tv_usec) / 1000;
-
- INFO("connectivity plugin (%llu): Interface %s status is now %s",
- millisecondsSinceEpoch, dev,
- ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
prev_status = il->status;
il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0);
- il->sec = tv.tv_sec;
- il->usec = tv.tv_usec;
+ il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
+
// If the new status is different than the previous status,
// store the previous status and set sent to zero
if (il->status != prev_status) {
il->prev_status = prev_status;
il->sent = 0;
}
+
+ DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
+ il->timestamp, dev,
+ ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
}
- // no need to loop again, we found the interface name
+ // no need to loop again, we found the interface name attr
// (otherwise the first if-statement in the loop would
// have moved us on with 'continue')
break;
/* Message is some kind of error */
if (h->nlmsg_type == NLMSG_ERROR) {
- ERROR("connectivity plugin: read_event: Message is an error - decode "
- "TBD\n");
+ ERROR("connectivity plugin: read_event: Message is an error\n");
return -1; // Error
}
// the case of a shutdown is just assures that the thread is
// gone and that the process has been fully terminated.
- INFO("connectivity plugin: Canceling thread for process shutdown");
+ DEBUG("connectivity plugin: Canceling thread for process shutdown");
status = pthread_cancel(connectivity_thread_id);
connectivity_thread_error = 0;
pthread_mutex_unlock(&connectivity_lock);
- INFO("connectivity plugin: Finished requesting stop of thread");
+ DEBUG("connectivity plugin: Finished requesting stop of thread");
return (status);
} /* }}} int stop_thread */
il->interface = interface;
il->status = 2; // "unknown"
il->prev_status = 2;
+ il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
il->sent = 0;
il->next = interfacelist_head;
interfacelist_head = il;
return (0);
} /* }}} int connectivity_config */
-static void submit(const char *interface, const char *type, /* {{{ */
- gauge_t value, uint32_t sec, uint32_t usec) {
- value_list_t vl = VALUE_LIST_INIT;
+static void connectivity_dispatch_notification(
+ const char *interface, const char *type, /* {{{ */
+ gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
+ char *buf = NULL;
+ notification_t n = {
+ NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
+
+ if (value == 1)
+ n.severity = NOTIF_OKAY;
+
char hostname[1024];
- vl.values = &(value_t){.gauge = value};
- vl.values_len = 1;
- sstrncpy(vl.plugin, "connectivity", sizeof(vl.plugin));
- sstrncpy(vl.type_instance, interface, sizeof(vl.type_instance));
- sstrncpy(vl.type, type, sizeof(vl.type));
-
- // Create metadata to store JSON key-values
- meta_data_t *meta = meta_data_create();
-
- vl.meta = meta;
- // For latency measurement
- struct timeval tv;
- gettimeofday(&tv, NULL);
gethostname(hostname, sizeof(hostname));
- char strSec[11];
- char struSec[11];
- snprintf(strSec, sizeof strSec, "%" PRIu32, sec);
- snprintf(struSec, sizeof struSec, "%" PRIu32, usec);
- if (value == 1) {
- meta_data_add_string(meta, "condition", "interface_up");
- meta_data_add_string(meta, "entity", interface);
- meta_data_add_string(meta, "source", hostname);
- meta_data_add_string(meta, "sec", strSec);
- meta_data_add_string(meta, "usec", struSec);
- meta_data_add_string(meta, "dest", "interface_down");
- } else {
- meta_data_add_string(meta, "condition", "interface_down");
- meta_data_add_string(meta, "entity", interface);
- meta_data_add_string(meta, "source", hostname);
- meta_data_add_string(meta, "sec", strSec);
- meta_data_add_string(meta, "usec", struSec);
- meta_data_add_string(meta, "dest", "interface_up");
+
+ sstrncpy(n.host, hostname, sizeof(n.host));
+ sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
+ sstrncpy(n.type, "gauge", sizeof(n.type));
+ sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
+
+ gen_message_payload(value, old_value, interface, timestamp, &buf);
+
+ notification_meta_t *m = calloc(1, sizeof(*m));
+
+ if (m == NULL) {
+ char errbuf[1024];
+ sfree(buf);
+ ERROR("connectivity plugin: unable to allocate metadata: %s",
+ sstrerror(errno, errbuf, sizeof(errbuf)));
+ return;
}
- plugin_dispatch_values(&vl);
-} /* }}} void interface_submit */
+ sstrncpy(m->name, "ves", sizeof(m->name));
+ m->nm_value.nm_string = sstrdup(buf);
+ m->type = NM_TYPE_STRING;
+ n.meta = m;
+
+ DEBUG("connectivity plugin: notification message: %s",
+ n.meta->nm_value.nm_string);
+
+ DEBUG("connectivity plugin: dispatching state %d for interface %s",
+ (int)value, interface);
+
+ plugin_dispatch_notification(&n);
+ plugin_notification_meta_free(n.meta);
+
+ // malloc'd in gen_message_payload
+ if (buf != NULL)
+ sfree(buf);
+}
static int connectivity_read(void) /* {{{ */
{
uint32_t prev_status;
uint32_t sent;
- /* Locking here works, because the structure of the linked list is only
- * changed during configure and shutdown. */
pthread_mutex_lock(&connectivity_lock);
status = il->status;
sent = il->sent;
if (status != prev_status && sent == 0) {
- submit(il->interface, "gauge", status, il->sec, il->usec);
-
+ connectivity_dispatch_notification(il->interface, "gauge", status,
+ prev_status, il->timestamp);
il->sent = 1;
}
{
interfacelist_t *il;
- INFO("connectivity plugin: Shutting down thread.");
+ DEBUG("connectivity plugin: Shutting down thread.");
if (stop_thread(1) < 0)
return (-1);