The original names are a bit long.
#ReadThreads 5
#WriteThreads 5
+# Limit the size of the write queue. Default is no limit. Setting up a limit is
+# recommended for servers handling a high volume of traffic.
+#WriteQueueLimitHigh 1000000
+#WriteQueueLimitLow 800000
+
##############################################################################
# Logging #
#----------------------------------------------------------------------------#
default value is B<5>, but you may want to increase this if you have more than
five plugins that may take relatively long to write to.
-=item B<WriteQueueLengthLimitHigh> I<Num>
-
-=item B<WriteQueueLengthLimitLow> I<Num>
-
-Default value for high limit is 0 (no limit).
-Default value for low limit is 50% of high limit.
-
-When the write queue size becomes bigger than the high limit, values I<will> be dropped.
-When the write queue size is between low and high, values I<may> be dropped (depending
-on the queue size)
-
-If high limit is set to 0, there is no limit. This is the default.
-If high limit is set, but not low limit, low will be computed as 50% of high.
-
-If you do not want to randomly drop values when the queue size is between low
-and high value, set the same value for low and high. When low=high and when the
-queue size is bigger, values are just dropped until the queue size becomes smaller.
+=item B<WriteQueueLimitHigh> I<HighNum>
+
+=item B<WriteQueueLimitLow> I<LowNum>
+
+Metrics are read by the I<read threads> and then put into a queue to be handled
+by the I<write threads>. If one of the I<write plugins> is slow (e.g. network
+timeouts, I/O saturation of the disk) this queue will grow. In order to avoid
+running into memory issues in such a case, you can limit the size of this
+queue.
+
+By default, there is no limit and memory may grow indefinitely. This is most
+likely not an issue for clients, i.e. instances that only handle the local
+metrics. For servers it is recommended to set this to a non-zero value, though.
+
+You can set the limits using B<WriteQueueLimitHigh> and B<WriteQueueLimitLow>.
+Each of them takes a numerical argument which is the number of metrics in the
+queue. If there are I<HighNum> metrics in the queue, any new metrics I<will> be
+dropped. If there are less than I<LowNum> metrics in the queue, all new metrics
+I<will> be enqueued. If the number of metrics currently in the queue is between
+I<LowNum> and I<HighNum>, the metric is dropped with a probability that is
+proportional to the number of metrics in the queue (i.e. it increases linearly
+until it reaches 100%.)
+
+If B<WriteQueueLimitHigh> is set to non-zero and B<WriteQueueLimitLow> is
+unset, the latter will default to half of B<WriteQueueLimitHigh>.
+
+If you do not want to randomly drop values when the queue size is between
+I<LowNum> and I<HighNum>, set If B<WriteQueueLimitHigh> and
+B<WriteQueueLimitLow> to same value.
=item B<Hostname> I<Name>
{"Interval", NULL, NULL},
{"ReadThreads", NULL, "5"},
{"WriteThreads", NULL, "5"},
- {"WriteQueueLengthLimitHigh", NULL, NULL},
- {"WriteQueueLengthLimitLow", NULL, NULL},
+ {"WriteQueueLimitHigh", NULL, NULL},
+ {"WriteQueueLimitLow", NULL, NULL},
{"Timeout", NULL, "2"},
{"AutoLoadPlugin", NULL, "false"},
{"PreCacheChain", NULL, "PreCache"},
const char *str;
long value;
- str = global_option_get(option);
- if(NULL == str) return(default_value);
+ str = global_option_get (option);
+ if (NULL == str)
+ return (default_value);
errno = 0;
- value = strtol(str, NULL, 10);
- if (errno == ERANGE && (value == LONG_MAX || value == LONG_MIN)) return(default_value);
- if (errno != 0 && value == 0) return(default_value);
- return(value);
-} /* char *global_option_get_long */
-
-long global_option_get_long_in_range (const char *option, long default_value, long min, long max)
-{
- long value;
-
- assert(min <= max);
- value = global_option_get_long(option, default_value);
- if(value < min) return(default_value);
- if(value > max) return(default_value);
- return(value);
+ value = strtol (str, /* endptr = */ NULL, /* base = */ 0);
+ if (errno != 0)
+ return (default_value);
-} /* char *global_option_get_long_in_range */
+ return (value);
+} /* char *global_option_get_long */
cdtime_t cf_get_default_interval (void)
{
void plugin_init_all (void)
{
- const char *chain_name;
+ char const *chain_name;
+ long write_threads_num;
llentry_t *le;
int status;
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
- write_limit_high = global_option_get_long_in_range("WriteQueueLengthLimitHigh",0, 0, LONG_MAX);
- write_limit_low = global_option_get_long_in_range("WriteQueueLengthLimitLow", (write_limit_high+1)/2, 0, (write_limit_high == 0) ? 0 : write_limit_high-1 );
-
+ write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+ /* default = */ 0);
+ if (write_limit_high < 0)
{
- char const *tmp = global_option_get ("WriteThreads");
- int num = atoi (tmp);
+ ERROR ("WriteQueueLimitHigh must be positive or zero.");
+ write_limit_high = 0;
+ }
- if (num < 1)
- num = 5;
+ write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+ /* default = */ write_limit_high / 2);
+ if (write_limit_low < 0)
+ {
+ ERROR ("WriteQueueLimitLow must be positive or zero.");
+ write_limit_low = write_limit_high / 2;
+ }
+ else if (write_limit_low > write_limit_high)
+ {
+ ERROR ("WriteQueueLimitLow must not be larger than "
+ "WriteQueueLimitHigh.");
+ write_limit_low = write_limit_high;
+ }
- start_write_threads ((size_t) num);
+ write_threads_num = global_option_get_long ("WriteThreads",
+ /* default = */ 5);
+ if (write_threads_num < 1)
+ {
+ ERROR ("WriteThreads must be positive.");
+ write_threads_num = 5;
}
+ start_write_threads ((size_t) write_threads_num);
+
if ((list_init == NULL) && (read_heap == NULL))
return;