static int send_buffer_fill;
static value_list_t send_buffer_vl = VALUE_LIST_INIT;
static char send_buffer_type[DATA_MAX_NAME_LEN];
+static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
/*
* Private functions
uint16_t h_length;
uint16_t h_type;
- DEBUG ("ret_buffer = %p; ret_buffer_len = %i; output = %p; output_len = %i;",
+ DEBUG ("network plugin: parse_part_string: ret_buffer = %p;"
+ " ret_buffer_len = %i; output = %p; output_len = %i;",
*ret_buffer, *ret_buffer_len,
(void *) output, output_len);
h_length = ntohs (ps.head->length);
h_type = ntohs (ps.head->type);
- DEBUG ("length = %hu; type = %hu;", h_length, h_type);
+ DEBUG ("network plugin: parse_part_string: length = %hu; type = %hu;",
+ h_length, h_type);
if (buffer_len < h_length)
{
}
strcpy (output, ps.value);
- DEBUG ("output = %s", output);
+ DEBUG ("network plugin: parse_part_string: output = %s", output);
*ret_buffer = (void *) (buffer + h_length);
*ret_buffer_len = buffer_len - h_length;
if (ntohs (header->length) > buffer_len)
break;
+ /* Assure that this loop terminates eventually */
+ if (ntohs (header->length) < 4)
+ break;
- if (header->type == htons (TYPE_VALUES))
+ if (ntohs (header->type) == TYPE_VALUES)
{
status = parse_part_values (&buffer, &buffer_len,
&vl.values, &vl.values_len);
DEBUG ("NOT dispatching values");
}
}
- else if (header->type == ntohs (TYPE_TIME))
+ else if (ntohs (header->type) == TYPE_TIME)
{
uint64_t tmp = 0;
status = parse_part_number (&buffer, &buffer_len, &tmp);
if (status == 0)
vl.time = (time_t) tmp;
}
- else if (header->type == ntohs (TYPE_HOST))
+ else if (ntohs (header->type) == TYPE_HOST)
{
status = parse_part_string (&buffer, &buffer_len,
vl.host, sizeof (vl.host));
+ DEBUG ("network plugin: parse_packet: vl.host = %s", vl.host);
}
- else if (header->type == ntohs (TYPE_PLUGIN))
+ else if (ntohs (header->type) == TYPE_PLUGIN)
{
status = parse_part_string (&buffer, &buffer_len,
vl.plugin, sizeof (vl.plugin));
+ DEBUG ("network plugin: parse_packet: vl.plugin = %s", vl.plugin);
}
- else if (header->type == ntohs (TYPE_PLUGIN_INSTANCE))
+ else if (ntohs (header->type) == TYPE_PLUGIN_INSTANCE)
{
status = parse_part_string (&buffer, &buffer_len,
vl.plugin_instance, sizeof (vl.plugin_instance));
+ DEBUG ("network plugin: parse_packet: vl.plugin_instance = %s", vl.plugin_instance);
}
- else if (header->type == ntohs (TYPE_TYPE))
+ else if (ntohs (header->type) == TYPE_TYPE)
{
status = parse_part_string (&buffer, &buffer_len,
type, sizeof (type));
+ DEBUG ("network plugin: parse_packet: type = %s", type);
}
- else if (header->type == ntohs (TYPE_TYPE_INSTANCE))
+ else if (ntohs (header->type) == TYPE_TYPE_INSTANCE)
{
status = parse_part_string (&buffer, &buffer_len,
vl.type_instance, sizeof (vl.type_instance));
+ DEBUG ("network type: parse_packet: vl.type_instance = %s", vl.type_instance);
}
else
{
- DEBUG ("Unknown part type: 0x%0hx", header->type);
- buffer = ((char *) buffer) + header->length;
+ DEBUG ("Unknown part type: 0x%0hx", ntohs (header->type));
+ buffer = ((char *) buffer) + ntohs (header->length);
}
} /* while (buffer_len > sizeof (part_header_t)) */
value_list_t *vl_def, char *type_def,
const data_set_t *ds, const value_list_t *vl)
{
+ char *buffer_orig = buffer;
+
if (strcmp (vl_def->host, vl->host) != 0)
{
if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
{
- if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
+ if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
vl->type_instance,
strlen (vl->type_instance)) != 0)
return (-1);
if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
return (-1);
- return (buffer_size);
+ return (buffer - buffer_orig);
} /* int add_to_buffer */
static void flush_buffer (void)
static int network_write (const data_set_t *ds, const value_list_t *vl)
{
int status;
- /* TODO: lock buffer */
+
+ pthread_mutex_lock (&send_buffer_lock);
+
status = add_to_buffer (send_buffer_ptr,
sizeof (send_buffer) - send_buffer_fill,
&send_buffer_vl, send_buffer_type,
ds, vl);
if (status >= 0)
{
+ /* status == bytes added to the buffer */
send_buffer_fill += status;
send_buffer_ptr += status;
}
{
flush_buffer ();
}
- /* TODO: unlock buffer */
+
+ pthread_mutex_unlock (&send_buffer_lock);
return ((status < 0) ? -1 : 0);
} /* int network_write */