#include "collectd.h"
#include "common.h" /* auxiliary functions */
#include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
+#include "utils_cache.h"
+#include "network.h"
#include <pthread.h>
#include <zmq.h>
static value_list_t send_buffer_vl = VALUE_LIST_STATIC;
+static uint64_t stats_values_not_dispatched = 0;
+static uint64_t stats_values_dispatched = 0;
+
+/* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
+ * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-------+-----------------------+-------------------------------+
+ * ! Ver. ! ! Length !
+ * +-------+-----------------------+-------------------------------+
+ */
+struct part_header_s
+{
+ uint16_t type;
+ uint16_t length;
+};
+typedef struct part_header_s part_header_t;
+
// we do not want to crypt here
#undef HAVE_LIBGCRYPT
+static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */
+{
+ uint64_t time_sent = 0;
+ int status;
+
+ status = uc_meta_data_get_unsigned_int (vl,
+ "network:time_sent", &time_sent);
+
+ /* This is a value we already sent. Don't allow it to be received again in
+ * order to avoid looping. */
+ if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
+ return (false);
+
+ return (true);
+} /* }}} _Bool check_receive_okay */
+
static int network_dispatch_values (value_list_t *vl, /* {{{ */
const char *username)
{
int status;
+
+ // DEBUG("host: %s", vl->host);
+ // DEBUG("plugin: %s", vl->plugin);
+ // DEBUG("plugin_instance: %s", vl->plugin_instance);
+ // DEBUG("type: %s", vl->type);
+ // DEBUG("type_instance: %s", vl->type_instance);
if ((vl->time <= 0)
|| (strlen (vl->host) <= 0)
|| (strlen (vl->plugin) <= 0)
|| (strlen (vl->type) <= 0))
return (-EINVAL);
-
+
if (!check_receive_okay (vl))
{
#if COLLECT_DEBUG
return (status);
}
}
-
+
+ // DEBUG("dispatching %d values", vl->values_len);
plugin_dispatch_values (vl);
stats_values_dispatched++;
const data_set_t *ds, const value_list_t *vl)
{
char *buffer_orig = buffer;
-
- if (strcmp (vl_def->host, vl->host) != 0)
- {
- if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
- vl->host, strlen (vl->host)) != 0)
- return (-1);
- sstrncpy (vl_def->host, vl->host, sizeof (vl_def->host));
- }
-
- if (vl_def->time != vl->time)
- {
- if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
- (uint64_t) vl->time))
- return (-1);
- vl_def->time = vl->time;
- }
-
- if (vl_def->interval != vl->interval)
- {
- if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
- (uint64_t) vl->interval))
- return (-1);
- vl_def->interval = vl->interval;
- }
-
- if (strcmp (vl_def->plugin, vl->plugin) != 0)
- {
- if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
- vl->plugin, strlen (vl->plugin)) != 0)
- return (-1);
- sstrncpy (vl_def->plugin, vl->plugin, sizeof (vl_def->plugin));
- }
-
- if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
- {
- if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
- vl->plugin_instance,
- strlen (vl->plugin_instance)) != 0)
- return (-1);
- sstrncpy (vl_def->plugin_instance, vl->plugin_instance, sizeof (vl_def->plugin_instance));
- }
-
- if (strcmp (vl_def->type, vl->type) != 0)
- {
- if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
- vl->type, strlen (vl->type)) != 0)
- return (-1);
- sstrncpy (vl_def->type, ds->type, sizeof (vl_def->type));
- }
-
- if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
- {
- if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
- vl->type_instance,
- strlen (vl->type_instance)) != 0)
- return (-1);
- sstrncpy (vl_def->type_instance, vl->type_instance, sizeof (vl_def->type_instance));
- }
+
+ if (write_part_string (&buffer, &buffer_size, TYPE_HOST, vl->host, strlen (vl->host)) != 0)
+ return (-1);
+
+ if (write_part_number (&buffer, &buffer_size, TYPE_TIME, (uint64_t) vl->time))
+ return (-1);
+
+ if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL, (uint64_t) vl->interval))
+ return (-1);
+
+ if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, vl->plugin, strlen (vl->plugin)) != 0)
+ return (-1);
+
+ if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0)
+ return (-1);
+
+ if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, vl->type, strlen (vl->type)) != 0)
+ return (-1);
+
+ if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0)
+ return (-1);
if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
return (-1);
} /* int parse_part_string */
-static int parse_packet (sockent_t *se, /* {{{ */
+static int parse_packet (void *se, /* {{{ */
void *buffer, size_t buffer_size, int flags,
const char *username)
{
if (pkg_type == TYPE_ENCR_AES256)
{
- status = parse_part_encr_aes256 (se,
- &buffer, &buffer_size, flags);
- if (status != 0)
- {
+ // status = parse_part_encr_aes256 (se,
+ // &buffer, &buffer_size, flags);
+ // if (status != 0)
+ // {
ERROR ("network plugin: Decrypting AES256 "
"part failed "
"with status %i.", status);
break;
- }
+ // }
}
#if HAVE_LIBGCRYPT
else if ((se->data.server.security_level == SECURITY_LEVEL_ENCRYPT)
#endif /* HAVE_LIBGCRYPT */
else if (pkg_type == TYPE_SIGN_SHA256)
{
- status = parse_part_sign_sha256 (se,
- &buffer, &buffer_size, flags);
- if (status != 0)
- {
+ // status = parse_part_sign_sha256 (se,
+ // &buffer, &buffer_size, flags);
+ // if (status != 0)
+ // {
ERROR ("network plugin: Verifying HMAC-SHA-256 "
"signature failed "
"with status %i.", status);
break;
- }
+ // }
}
#if HAVE_LIBGCRYPT
else if ((se->data.server.security_level == SECURITY_LEVEL_SIGN)
zmq_listen_on = NULL;
}
- if( zmq_listen_on = strdup(value)) == NULL ) {
+ if( (zmq_listen_on = strdup(value)) == NULL ) {
return 1;
}
}
zmq_send_to = NULL;
}
- if( zmq_send_to = strdup(value)) == NULL ) {
+ if( (zmq_send_to = strdup(value)) == NULL ) {
return 1;
}
}
while( thread_running ) {
status = zmq_msg_init(&msg);
assert( status == 0 );
- if( zmq_recv(&pull_socket, &msg, 0) != 0 ) {
+ if( zmq_recv(pull_socket, &msg, 0) != 0 ) {
WARNING("zmq_recv : %s", zmq_strerror(errno));
continue;
}
/* flags = */ 0,
/* username = */ NULL);
+ DEBUG("ZeroMQ: received data, parse returned %d", status);
+
+ if( zmq_msg_close(&msg) != 0 ) {
+ ERROR("zmq_msg_close : %s", zmq_strerror(errno));
+ }
}
+
+ return NULL;
}
static int plugin_init (void)
static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data)
{
+ DEBUG("ZeroMQ: received notification, not implemented yet");
return 0;
}
char *send_buffer;
int send_buffer_size = PACKET_SIZE, real_size;
+ // do nothing if no socket open
+ if( zmq_send_to == NULL )
+ return 0;
+
send_buffer = malloc(PACKET_SIZE);
if( send_buffer == NULL ) {
ERROR("Unable to allocate memory for send_buffer, aborting write");
}
}
+ DEBUG("ZeroMQ: data sent");
+
return 0;
}
if( zmq_context ) {
thread_running = 0;
- pthread_join(listen_thread_id, NULL);
+
+ DEBUG("ZeroMQ: shutting down");
if( zmq_term(zmq_context) != 0 ) {
ERROR("zmq_term : %s", zmq_strerror(errno));
return 1;
}
+
+ pthread_join(listen_thread_id, NULL);
}
return 0;