first working version
authorJulien Ammous <j.ammous@gmail.com>
Wed, 27 Oct 2010 19:30:33 +0000 (21:30 +0200)
committerJulien Ammous <j.ammous@gmail.com>
Wed, 27 Oct 2010 19:30:33 +0000 (21:30 +0200)
src/zeromq.c

index 070c250..70fa306 100644 (file)
@@ -26,6 +26,8 @@
 #include "collectd.h"
 #include "common.h" /* auxiliary functions */
 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
+#include "utils_cache.h"
+#include "network.h"
 
 #include <pthread.h>
 #include <zmq.h>
 
 static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
 
+static uint64_t stats_values_not_dispatched = 0;
+static uint64_t stats_values_dispatched = 0;
+
+/*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
+ *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-------+-----------------------+-------------------------------+
+ * ! Ver.  !                       ! Length                        !
+ * +-------+-----------------------+-------------------------------+
+ */
+struct part_header_s
+{
+       uint16_t type;
+       uint16_t length;
+};
+typedef struct part_header_s part_header_t;
+
 // we do not want to crypt here
 #undef HAVE_LIBGCRYPT
 
+static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */
+{
+  uint64_t time_sent = 0;
+  int status;
+
+  status = uc_meta_data_get_unsigned_int (vl,
+      "network:time_sent", &time_sent);
+
+  /* This is a value we already sent. Don't allow it to be received again in
+   * order to avoid looping. */
+  if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
+    return (false);
+
+  return (true);
+} /* }}} _Bool check_receive_okay */
+
 static int network_dispatch_values (value_list_t *vl, /* {{{ */
     const char *username)
 {
   int status;
+  
+  // DEBUG("host: %s", vl->host);
+  // DEBUG("plugin: %s", vl->plugin);
+  // DEBUG("plugin_instance: %s", vl->plugin_instance);
+  // DEBUG("type: %s", vl->type);
+  // DEBUG("type_instance: %s", vl->type_instance);
 
   if ((vl->time <= 0)
       || (strlen (vl->host) <= 0)
       || (strlen (vl->plugin) <= 0)
       || (strlen (vl->type) <= 0))
     return (-EINVAL);
-
+  
   if (!check_receive_okay (vl))
   {
 #if COLLECT_DEBUG
@@ -90,7 +130,8 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */
       return (status);
     }
   }
-
+  
+  // DEBUG("dispatching %d values", vl->values_len);
   plugin_dispatch_values (vl);
   stats_values_dispatched++;
 
@@ -390,64 +431,27 @@ static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */
     const data_set_t *ds, const value_list_t *vl)
 {
   char *buffer_orig = buffer;
-
-  if (strcmp (vl_def->host, vl->host) != 0)
-  {
-    if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
-          vl->host, strlen (vl->host)) != 0)
-      return (-1);
-    sstrncpy (vl_def->host, vl->host, sizeof (vl_def->host));
-  }
-
-  if (vl_def->time != vl->time)
-  {
-    if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
-          (uint64_t) vl->time))
-      return (-1);
-    vl_def->time = vl->time;
-  }
-
-  if (vl_def->interval != vl->interval)
-  {
-    if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
-          (uint64_t) vl->interval))
-      return (-1);
-    vl_def->interval = vl->interval;
-  }
-
-  if (strcmp (vl_def->plugin, vl->plugin) != 0)
-  {
-    if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
-          vl->plugin, strlen (vl->plugin)) != 0)
-      return (-1);
-    sstrncpy (vl_def->plugin, vl->plugin, sizeof (vl_def->plugin));
-  }
-
-  if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
-  {
-    if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
-          vl->plugin_instance,
-          strlen (vl->plugin_instance)) != 0)
-      return (-1);
-    sstrncpy (vl_def->plugin_instance, vl->plugin_instance, sizeof (vl_def->plugin_instance));
-  }
-
-  if (strcmp (vl_def->type, vl->type) != 0)
-  {
-    if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
-          vl->type, strlen (vl->type)) != 0)
-      return (-1);
-    sstrncpy (vl_def->type, ds->type, sizeof (vl_def->type));
-  }
-
-  if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
-  {
-    if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
-          vl->type_instance,
-          strlen (vl->type_instance)) != 0)
-      return (-1);
-    sstrncpy (vl_def->type_instance, vl->type_instance, sizeof (vl_def->type_instance));
-  }
+  
+  if (write_part_string (&buffer, &buffer_size, TYPE_HOST, vl->host, strlen (vl->host)) != 0)
+    return (-1);
+  
+  if (write_part_number (&buffer, &buffer_size, TYPE_TIME, (uint64_t) vl->time))
+    return (-1);
+  
+  if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL, (uint64_t) vl->interval))
+    return (-1);
+    
+  if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, vl->plugin, strlen (vl->plugin)) != 0)
+    return (-1);
+  
+  if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0)
+    return (-1);
+  
+  if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, vl->type, strlen (vl->type)) != 0)
+    return (-1);
+  
+  if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0)
+    return (-1);
   
   if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
     return (-1);
@@ -580,7 +584,7 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
 } /* int parse_part_string */
 
 
-static int parse_packet (sockent_t *se, /* {{{ */
+static int parse_packet (void *se, /* {{{ */
     void *buffer, size_t buffer_size, int flags,
     const char *username)
 {
@@ -624,15 +628,15 @@ static int parse_packet (sockent_t *se, /* {{{ */
 
     if (pkg_type == TYPE_ENCR_AES256)
     {
-      status = parse_part_encr_aes256 (se,
-          &buffer, &buffer_size, flags);
-      if (status != 0)
-      {
+      // status = parse_part_encr_aes256 (se,
+      //     &buffer, &buffer_size, flags);
+      // if (status != 0)
+      // {
         ERROR ("network plugin: Decrypting AES256 "
             "part failed "
             "with status %i.", status);
         break;
-      }
+      // }
     }
 #if HAVE_LIBGCRYPT
     else if ((se->data.server.security_level == SECURITY_LEVEL_ENCRYPT)
@@ -650,15 +654,15 @@ static int parse_packet (sockent_t *se, /* {{{ */
 #endif /* HAVE_LIBGCRYPT */
     else if (pkg_type == TYPE_SIGN_SHA256)
     {
-      status = parse_part_sign_sha256 (se,
-                                        &buffer, &buffer_size, flags);
-      if (status != 0)
-      {
+      // status = parse_part_sign_sha256 (se,
+      //                                   &buffer, &buffer_size, flags);
+      // if (status != 0)
+      // {
         ERROR ("network plugin: Verifying HMAC-SHA-256 "
             "signature failed "
             "with status %i.", status);
         break;
-      }
+      // }
     }
 #if HAVE_LIBGCRYPT
     else if ((se->data.server.security_level == SECURITY_LEVEL_SIGN)
@@ -849,7 +853,7 @@ static int my_config (const char *key, const char *value)
       zmq_listen_on = NULL;
     }
     
-    if( zmq_listen_on = strdup(value)) == NULL ) {
+    if( (zmq_listen_on = strdup(value)) == NULL ) {
       return 1;
     }
   }
@@ -859,7 +863,7 @@ static int my_config (const char *key, const char *value)
       zmq_send_to = NULL;
     }
     
-    if( zmq_send_to = strdup(value)) == NULL ) {
+    if( (zmq_send_to = strdup(value)) == NULL ) {
       return 1;
     }
   }
@@ -878,7 +882,7 @@ static void *receive_thread (void __attribute__((unused)) *arg)
   while( thread_running ) {
     status = zmq_msg_init(&msg);
     assert( status == 0 );
-    if( zmq_recv(&pull_socket, &msg, 0) != 0 ) {
+    if( zmq_recv(pull_socket, &msg, 0) != 0 ) {
       WARNING("zmq_recv : %s", zmq_strerror(errno));
       continue;
     }
@@ -890,8 +894,15 @@ static void *receive_thread (void __attribute__((unused)) *arg)
       /* flags = */ 0,
       /* username = */ NULL);
     
+    DEBUG("ZeroMQ: received data, parse returned %d", status);
+    
+    if( zmq_msg_close(&msg) != 0 ) {
+      ERROR("zmq_msg_close : %s", zmq_strerror(errno));
+    }
     
   }
+  
+  return NULL;
 }
 
 static int plugin_init (void)
@@ -963,6 +974,7 @@ static int plugin_init (void)
 
 static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data)
 {
+  DEBUG("ZeroMQ: received notification, not implemented yet");
   return 0;
 }
 
@@ -980,6 +992,10 @@ static int write_value (const data_set_t *ds, const value_list_t *vl,
   char      *send_buffer;
   int       send_buffer_size = PACKET_SIZE, real_size;
   
+  // do nothing if no socket open
+  if( zmq_send_to == NULL )
+    return 0;
+  
   send_buffer = malloc(PACKET_SIZE);
   if( send_buffer == NULL ) {
     ERROR("Unable to allocate memory for send_buffer, aborting write");
@@ -1008,6 +1024,8 @@ static int write_value (const data_set_t *ds, const value_list_t *vl,
     }
   }
   
+  DEBUG("ZeroMQ: data sent");
+  
   return 0;
 }
 
@@ -1016,12 +1034,15 @@ static int my_shutdown (void)
   if( zmq_context ) {
     
     thread_running = 0;
-    pthread_join(listen_thread_id, NULL);
+    
+    DEBUG("ZeroMQ: shutting down");
     
     if( zmq_term(zmq_context) != 0 ) {
       ERROR("zmq_term : %s", zmq_strerror(errno));
       return 1;
     }
+    
+    pthread_join(listen_thread_id, NULL);
   }
   
   return 0;