network plugin: Make the receive thread even faster.
[collectd.git] / src / network.c
index 39069aa..34f89d9 100644 (file)
@@ -178,7 +178,6 @@ static char         send_buffer[BUFF_SIZE];
 static char        *send_buffer_ptr;
 static int          send_buffer_fill;
 static value_list_t send_buffer_vl = VALUE_LIST_STATIC;
-static char         send_buffer_type[DATA_MAX_NAME_LEN];
 static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static c_avl_tree_t      *cache_tree = NULL;
@@ -1280,6 +1279,9 @@ static int network_receive (void)
        int i;
        int status;
 
+       receive_list_entry_t *private_list_head;
+       receive_list_entry_t *private_list_tail;
+
        if (listen_sockets_num == 0)
                network_add_listen_socket (NULL, NULL);
 
@@ -1289,6 +1291,9 @@ static int network_receive (void)
                return (-1);
        }
 
+       private_list_head = NULL;
+       private_list_tail = NULL;
+
        while (listen_loop == 0)
        {
                status = poll (listen_sockets, listen_sockets_num, -1);
@@ -1329,7 +1334,8 @@ static int network_receive (void)
                                ERROR ("network plugin: malloc failed.");
                                return (-1);
                        }
-                       memset (ent, '\0', sizeof (receive_list_entry_t));
+                       memset (ent, 0, sizeof (receive_list_entry_t));
+                       ent->next = NULL;
 
                        /* Hopefully this be optimized out by the compiler. It
                         * might help prevent stupid bugs in the future though.
@@ -1339,22 +1345,49 @@ static int network_receive (void)
                        memcpy (ent->data, buffer, buffer_len);
                        ent->data_len = buffer_len;
 
-                       pthread_mutex_lock (&receive_list_lock);
-                       if (receive_list_head == NULL)
-                       {
-                               receive_list_head = ent;
-                               receive_list_tail = ent;
-                       }
+                       if (private_list_head == NULL)
+                               private_list_head = ent;
                        else
+                               private_list_tail->next = ent;
+                       private_list_tail = ent;
+
+                       /* Do not block here. Blocking here has led to
+                        * insufficient performance in the past. */
+                       if (pthread_mutex_trylock (&receive_list_lock) == 0)
                        {
-                               receive_list_tail->next = ent;
-                               receive_list_tail = ent;
+                               if (receive_list_head == NULL)
+                                       receive_list_head = private_list_head;
+                               else
+                                       receive_list_tail->next = private_list_head;
+                               receive_list_tail = private_list_tail;
+
+                               private_list_head = NULL;
+                               private_list_tail = NULL;
+
+                               pthread_cond_signal (&receive_list_cond);
+                               pthread_mutex_unlock (&receive_list_lock);
                        }
-                       pthread_cond_signal (&receive_list_cond);
-                       pthread_mutex_unlock (&receive_list_lock);
                } /* for (listen_sockets) */
        } /* while (listen_loop == 0) */
 
+       /* Make sure everything is dispatched before exiting. */
+       if (private_list_head != NULL)
+       {
+               pthread_mutex_lock (&receive_list_lock);
+
+               if (receive_list_head == NULL)
+                       receive_list_head = private_list_head;
+               else
+                       receive_list_tail->next = private_list_head;
+               receive_list_tail = private_list_tail;
+
+               private_list_head = NULL;
+               private_list_tail = NULL;
+
+               pthread_cond_signal (&receive_list_cond);
+               pthread_mutex_unlock (&receive_list_lock);
+       }
+
        return (0);
 }
 
@@ -1393,7 +1426,7 @@ static void network_send_buffer (const char *buffer, int buffer_len)
 } /* void network_send_buffer */
 
 static int add_to_buffer (char *buffer, int buffer_size,
-               value_list_t *vl_def, char *type_def,
+               value_list_t *vl_def,
                const data_set_t *ds, const value_list_t *vl)
 {
        char *buffer_orig = buffer;
@@ -1439,12 +1472,12 @@ static int add_to_buffer (char *buffer, int buffer_size,
                sstrncpy (vl_def->plugin_instance, vl->plugin_instance, sizeof (vl_def->plugin_instance));
        }
 
-       if (strcmp (type_def, vl->type) != 0)
+       if (strcmp (vl_def->type, vl->type) != 0)
        {
                if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
                                        vl->type, strlen (vl->type)) != 0)
                        return (-1);
-               sstrncpy (type_def, vl->type, sizeof (type_def));
+               sstrncpy (vl_def->type, ds->type, sizeof (vl_def->type));
        }
 
        if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
@@ -1470,8 +1503,7 @@ static void flush_buffer (void)
        network_send_buffer (send_buffer, send_buffer_fill);
        send_buffer_ptr  = send_buffer;
        send_buffer_fill = 0;
-       memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
-       memset (send_buffer_type, '\0', sizeof (send_buffer_type));
+       memset (&send_buffer_vl, 0, sizeof (send_buffer_vl));
 }
 
 static int network_write (const data_set_t *ds, const value_list_t *vl)
@@ -1490,7 +1522,7 @@ static int network_write (const data_set_t *ds, const value_list_t *vl)
 
        status = add_to_buffer (send_buffer_ptr,
                        sizeof (send_buffer) - send_buffer_fill,
-                       &send_buffer_vl, send_buffer_type,
+                       &send_buffer_vl,
                        ds, vl);
        if (status >= 0)
        {
@@ -1504,7 +1536,7 @@ static int network_write (const data_set_t *ds, const value_list_t *vl)
 
                status = add_to_buffer (send_buffer_ptr,
                                sizeof (send_buffer) - send_buffer_fill,
-                               &send_buffer_vl, send_buffer_type,
+                               &send_buffer_vl,
                                ds, vl);
 
                if (status >= 0)
@@ -1548,7 +1580,10 @@ static int network_config (const char *key, const char *val)
                fields_num = strsplit (val_cpy, fields, 3);
                if ((fields_num != 1)
                                && (fields_num != 2))
+               {
+                       sfree (val_cpy);
                        return (1);
+               }
                else if (fields_num == 2)
                {
                        if ((service = strchr (fields[1], '.')) != NULL)
@@ -1561,6 +1596,8 @@ static int network_config (const char *key, const char *val)
                        network_add_listen_socket (node, service);
                else
                        network_add_sending_socket (node, service);
+
+               sfree (val_cpy);
        }
        else if (strcasecmp ("TimeToLive", key) == 0)
        {
@@ -1721,8 +1758,7 @@ static int network_init (void)
 
        send_buffer_ptr  = send_buffer;
        send_buffer_fill = 0;
-       memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
-       memset (send_buffer_type, '\0', sizeof (send_buffer_type));
+       memset (&send_buffer_vl, 0, sizeof (send_buffer_vl));
 
        cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
        cache_flush_last = time (NULL);