--- /dev/null
+-- collectd - contrib/postgresql/collectd_insert.sql
+-- Copyright (C) 2012 Sebastian 'tokkee' Harl
+-- All rights reserved.
+--
+-- Redistribution and use in source and binary forms, with or without
+-- modification, are permitted provided that the following conditions
+-- are met:
+--
+-- - Redistributions of source code must retain the above copyright
+-- notice, this list of conditions and the following disclaimer.
+--
+-- - Redistributions in binary form must reproduce the above copyright
+-- notice, this list of conditions and the following disclaimer in the
+-- documentation and/or other materials provided with the distribution.
+--
+-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+-- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+-- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+-- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+-- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+-- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+-- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+-- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+-- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+-- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+-- POSSIBILITY OF SUCH DAMAGE.
+
+-- Description:
+-- ------------
+--
+-- This is a sample database setup that may be used to write data collected by
+-- collectd to a PostgreSQL database. We're using two tables, 'identifiers'
+-- and 'values' to store the value-list identifier and the actual values
+-- respectively.
+--
+-- The 'values' table is partitioned to improve performance and maintainance.
+-- Please note that additional maintainance scripts are required in order to
+-- keep the setup running -- see the comments below for details.
+--
+-- The function 'collectd_insert' may be used to actually insert values
+-- submitted by collectd into those tables.
+--
+-- Sample configuration:
+-- ---------------------
+--
+-- <Plugin postgresql>
+-- <Writer sqlstore>
+-- Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+-- </Writer>
+-- <Database foo>
+-- # ...
+-- Writer sqlstore
+-- </Database>
+-- </Plugin>
+
+CREATE TABLE identifiers (
+ id integer NOT NULL,
+ host character varying(64) NOT NULL,
+ plugin character varying(64) NOT NULL,
+ plugin_inst character varying(64) DEFAULT NULL::character varying,
+ type character varying(64) NOT NULL,
+ type_inst character varying(64) DEFAULT NULL::character varying
+);
+CREATE SEQUENCE identifiers_id_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+ALTER SEQUENCE identifiers_id_seq OWNED BY identifiers.id;
+ALTER TABLE ONLY identifiers
+ ALTER COLUMN id SET DEFAULT nextval('identifiers_id_seq'::regclass);
+ALTER TABLE ONLY identifiers
+ ADD CONSTRAINT identifiers_host_plugin_plugin_inst_type_type_inst_key
+ UNIQUE (host, plugin, plugin_inst, type, type_inst);
+ALTER TABLE ONLY identifiers
+ ADD CONSTRAINT identifiers_pkey PRIMARY KEY (id);
+
+-- optionally, create indexes for the identifier fields
+CREATE INDEX identifiers_host ON identifiers USING btree (host);
+CREATE INDEX identifiers_plugin ON identifiers USING btree (plugin);
+CREATE INDEX identifiers_plugin_inst ON identifiers USING btree (plugin_inst);
+CREATE INDEX identifiers_type ON identifiers USING btree (type);
+CREATE INDEX identifiers_type_inst ON identifiers USING btree (type_inst);
+
+CREATE TABLE "values" (
+ id integer NOT NULL,
+ tstamp timestamp with time zone NOT NULL,
+ name character varying(64) NOT NULL,
+ value double precision NOT NULL
+);
+
+CREATE OR REPLACE VIEW collectd
+ AS SELECT host, plugin, plugin_inst, type, type_inst,
+ host
+ || '/' || plugin
+ || CASE
+ WHEN plugin_inst IS NOT NULL THEN '-'
+ ELSE ''
+ END
+ || coalesce(plugin_inst, '')
+ || '/' || type
+ || CASE
+ WHEN type_inst IS NOT NULL THEN '-'
+ ELSE ''
+ END
+ || coalesce(plugin_inst, '') AS identifier,
+ tstamp, name, value
+ FROM identifiers
+ JOIN values
+ ON values.id = identifiers.id;
+
+-- partition "values" by day (or week, month, ...)
+
+-- create the child tables for today and the next 'days' days:
+-- this may, for example, be used in a daily cron-job (or similar) to create
+-- the tables for the next couple of days
+CREATE OR REPLACE FUNCTION values_update_childs(
+ integer
+ ) RETURNS SETOF text
+ LANGUAGE plpgsql
+ AS $_$
+DECLARE
+ days alias for $1;
+ cur_day date;
+ next_day date;
+ i integer;
+BEGIN
+ IF days < 1 THEN
+ RAISE EXCEPTION 'Cannot have negative number of days';
+ END IF;
+
+ i := 0;
+ LOOP
+ EXIT WHEN i > days;
+
+ SELECT CAST ('now'::date + i * '1day'::interval AS date) INTO cur_day;
+ SELECT CAST ('now'::date + (i + 1) * '1day'::interval AS date) INTO next_day;
+
+ i := i + 1;
+
+ BEGIN
+ EXECUTE 'CREATE TABLE "values$' || cur_day || '" (
+ CHECK (tstamp >= TIMESTAMP ''' || cur_day || ''' '
+ || 'AND tstamp < TIMESTAMP ''' || next_day || ''')
+ ) INHERITS (values)';
+ EXCEPTION WHEN duplicate_table THEN
+ CONTINUE;
+ END;
+
+ RETURN NEXT 'values$' || cur_day::text;
+
+ EXECUTE 'ALTER TABLE ONLY "values$' || cur_day || '"
+ ADD CONSTRAINT "values_' || cur_day || '_pkey"
+ PRIMARY KEY (id, tstamp, name, value)';
+ EXECUTE 'ALTER TABLE ONLY "values$' || cur_day || '"
+ ADD CONSTRAINT "values_' || cur_day || '_id_fkey"
+ FOREIGN KEY (id) REFERENCES identifiers(id)';
+ END LOOP;
+ RETURN;
+END;
+$_$;
+
+-- create initial child tables
+SELECT values_update_childs(2);
+
+CREATE OR REPLACE FUNCTION values_insert_trigger()
+ RETURNS trigger
+ LANGUAGE plpgsql
+ AS $_$
+DECLARE
+ child_tbl character varying;
+BEGIN
+ SELECT 'values$' || CAST (NEW.tstamp AS DATE) INTO child_tbl;
+ -- Rather than using 'EXECUTE', some if-cascade checking the date may also
+ -- be used. However, this would require frequent updates of the trigger
+ -- function while this example works automatically.
+ EXECUTE 'INSERT INTO "' || child_tbl || '" VALUES ($1.*)' USING NEW;
+ RETURN NULL;
+END;
+$_$;
+
+CREATE TRIGGER insert_values_trigger
+ BEFORE INSERT ON values
+ FOR EACH ROW EXECUTE PROCEDURE values_insert_trigger();
+
+-- when querying values make sure to enable constraint exclusion
+-- SET constraint_exclusion = on;
+
+CREATE OR REPLACE FUNCTION collectd_insert(
+ timestamp with time zone, character varying,
+ character varying, character varying,
+ character varying, character varying,
+ character varying[], character varying[], double precision[]
+ ) RETURNS void
+ LANGUAGE plpgsql
+ AS $_$
+DECLARE
+ p_time alias for $1;
+ p_host alias for $2;
+ p_plugin alias for $3;
+ p_plugin_instance alias for $4;
+ p_type alias for $5;
+ p_type_instance alias for $6;
+ p_value_names alias for $7;
+ -- don't use the type info; for 'StoreRates true' it's 'gauge' anyway
+ -- p_type_names alias for $8;
+ p_values alias for $9;
+ ds_id integer;
+ i integer;
+BEGIN
+ SELECT id INTO ds_id
+ FROM identifiers
+ WHERE host = p_host
+ AND plugin = p_plugin
+ AND COALESCE(plugin_inst, '') = COALESCE(p_plugin_instance, '')
+ AND type = p_type
+ AND COALESCE(type_inst, '') = COALESCE(p_type_instance, '');
+ IF NOT FOUND THEN
+ INSERT INTO identifiers (host, plugin, plugin_inst, type, type_inst)
+ VALUES (p_host, p_plugin, p_plugin_instance, p_type, p_type_instance)
+ RETURNING id INTO ds_id;
+ END IF;
+ i := 1;
+ LOOP
+ EXIT WHEN i > array_upper(p_value_names, 1);
+ INSERT INTO values (id, tstamp, name, value)
+ VALUES (ds_id, p_time, p_value_names[i], p_values[i]);
+ i := i + 1;
+ END LOOP;
+END;
+$_$;
+
+-- vim: set expandtab :
statistics provided by PostgreSQL without the need to upgrade your collectd
installation.
+Starting with version 5.2, the C<postgresql> plugin supports writing data to
+PostgreSQL databases as well. This has been implemented in a generic way. You
+need to specify an SQL statement which will then be executed by collectd in
+order to write the data (see below for details). The benefit of that approach
+is that there is no fixed database layout. Rather, the layout may be optimized
+for the current setup.
+
The B<PostgreSQL Documentation> manual can be found at
L<http://www.postgresql.org/docs/manuals/>.
</Result>
</Query>
+ <Writer sqlstore>
+ Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+ StoreRates true
+ </Writer>
+
<Database foo>
Host "hostname"
Port "5432"
Query backend # predefined
Query rt36_tickets
</Database>
+
+ <Database qux>
+ # ...
+ Writer sqlstore
+ CommitInterval 10
+ </Database>
</Plugin>
The B<Query> block defines one database query which may later be used by a
=back
+The B<Writer> block defines a PostgreSQL writer backend. It accepts a single
+mandatory argument specifying the name of the writer. This will then be used
+in the B<Database> specification in order to activate the writer instance. The
+names of all writers have to be unique. The following options may be
+specified:
+
+=over 4
+
+=item B<Statement> I<sql statement>
+
+This mandatory option specifies the SQL statement that will be executed for
+each submitted value. A single SQL statement is allowed only. Anything after
+the first semicolon will be ignored.
+
+Nine parameters will be passed to the statement and should be specified as
+tokens B<$1>, B<$2>, through B<$9> in the statement string. The following
+values are made available through those parameters:
+
+=over 4
+
+=item B<$1>
+
+The timestamp of the queried value as a floating point number.
+
+=item B<$2>
+
+The hostname of the queried value.
+
+=item B<$3>
+
+The plugin name of the queried value.
+
+=item B<$4>
+
+The plugin instance of the queried value. This value may be B<NULL> if there
+is no plugin instance.
+
+=item B<$5>
+
+The type of the queried value (cf. L<types.db(5)>).
+
+=item B<$6>
+
+The type instance of the queried value. This value may be B<NULL> if there is
+no type instance.
+
+=item B<$7>
+
+An array of names for the submitted values (i.E<nbsp>e., the name of the data
+sources of the submitted value-list).
+
+=item B<$8>
+
+An array of types for the submitted values (i.E<nbsp>e., the type of the data
+sources of the submitted value-list; C<counter>, C<gauge>, ...). Note, that if
+B<StoreRates> is enabled (which is the default, see below), all types will be
+C<gauge>.
+
+=item B<$9>
+
+An array of the submitted values. The dimensions of the value name and value
+arrays match.
+
+=back
+
+In general, it is advisable to create and call a custom function in the
+PostgreSQL database for this purpose. Any procedural language supported by
+PostgreSQL will do (see chapter "Server Programming" in the PostgreSQL manual
+for details).
+
+=item B<StoreRates> B<false>|B<true>
+
+If set to B<true> (the default), convert counter values to rates. If set to
+B<false> counter values are stored as is, i.E<nbsp>e. as an increasing integer
+number.
+
+=back
+
The B<Database> block defines one PostgreSQL database for which to collect
statistics. It accepts a single mandatory argument which specifies the
database name. None of the other options are required. PostgreSQL will use
Specify the interval with which the database should be queried. The default is
to use the global B<Interval> setting.
+=item B<CommitInterval> I<seconds>
+
+This option may be used for database connections which have "writers" assigned
+(see above). If specified, it causes a writer to put several updates into a
+single transaction. This transaction will last for the specified amount of
+time. By default, each update will be executed in a separate transaction. Each
+transaction generates a fair amount of overhead which can, thus, be reduced by
+activating this option. The draw-back is, that data covering the specified
+amount of time will be lost, for example, if a single statement within the
+transaction fails or if the database server crashes.
+
=item B<Host> I<hostname>
Specify the hostname or IP of the PostgreSQL server to connect to. If the
=item B<Query> I<query>
-Specify a I<query> which should be executed for the database connection. This
-may be any of the predefined or user-defined queries. If no such option is
-given, it defaults to "backends", "transactions", "queries", "query_plans",
-"table_states", "disk_io" and "disk_usage". Else, the specified queries are
-used only.
+Specifies a I<query> which should be executed in the context of the database
+connection. This may be any of the predefined or user-defined queries. If no
+such option is given, it defaults to "backends", "transactions", "queries",
+"query_plans", "table_states", "disk_io" and "disk_usage" (unless a B<Writer>
+has been specified). Else, the specified queries are used only.
+
+=item B<Writer> I<writer>
+
+Assigns the specified I<writer> backend to the database connection. This
+causes all collected data to be send to the database using the settings
+defined in the writer configuration (see the section "FILTER CONFIGURATION"
+below for details on how to selectively send data to certain plugins).
+
+Each writer will register a flush callback which may be used when having long
+transactions enabled (see the B<CommitInterval> option above). When issuing
+the B<FLUSH> command (see L<collectd-unixsock(5)> for details) the current
+transaction will be committed right away. Two different kinds of flush
+callbacks are available with the C<postgresql> plugin:
+
+=over 4
+
+=item B<postgresql>
+
+Flush all writer backends.
+
+=item B<postgresql->I<database>
+
+Flush all writers of the specified I<database> only.
+
+=back
=back
/**
* collectd - src/postgresql.c
- * Copyright (C) 2008, 2009 Sebastian Harl
- * Copyright (C) 2009 Florian Forster
+ * Copyright (C) 2008-2012 Sebastian Harl
+ * Copyright (C) 2009 Florian Forster
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
#include "configfile.h"
#include "plugin.h"
+#include "utils_cache.h"
#include "utils_db_query.h"
#include "utils_complain.h"
+#if HAVE_PTHREAD_H
+# include <pthread.h>
+#endif
+
#include <pg_config_manual.h>
#include <libpq-fe.h>
#define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
#define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
#define log_info(...) INFO ("postgresql: " __VA_ARGS__)
+#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__)
#ifndef C_PSQL_DEFAULT_CONF
# define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
} c_psql_user_data_t;
typedef struct {
+ char *name;
+ char *statement;
+ _Bool store_rates;
+} c_psql_writer_t;
+
+typedef struct {
PGconn *conn;
c_complain_t conn_complaint;
udb_query_t **queries;
size_t queries_num;
+ c_psql_writer_t **writers;
+ size_t writers_num;
+
+ /* make sure we don't access the database object in parallel */
+ pthread_mutex_t db_lock;
+
cdtime_t interval;
+ /* writer "caching" settings */
+ cdtime_t commit_interval;
+ cdtime_t next_commit;
+
char *host;
char *port;
char *database;
char *krbsrvname;
char *service;
+
+ int ref_cnt;
} c_psql_database_t;
static char *def_queries[] = {
};
static int def_queries_num = STATIC_ARRAY_SIZE (def_queries);
+static c_psql_database_t *databases = NULL;
+static size_t databases_num = 0;
+
static udb_query_t **queries = NULL;
static size_t queries_num = 0;
+static c_psql_writer_t *writers = NULL;
+static size_t writers_num = 0;
+
+static int c_psql_begin (c_psql_database_t *db)
+{
+ PGresult *r = PQexec (db->conn, "BEGIN");
+
+ int status = 1;
+
+ if (r != NULL) {
+ if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+ db->next_commit = cdtime() + db->commit_interval;
+ status = 0;
+ }
+ else
+ log_warn ("Failed to initiate ('BEGIN') transaction: %s",
+ PQerrorMessage (db->conn));
+ PQclear (r);
+ }
+ return status;
+} /* c_psql_begin */
+
+static int c_psql_commit (c_psql_database_t *db)
+{
+ PGresult *r = PQexec (db->conn, "COMMIT");
+
+ int status = 1;
+
+ if (r != NULL) {
+ if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+ db->next_commit = 0;
+ log_debug ("Successfully committed transaction.");
+ status = 0;
+ }
+ else
+ log_warn ("Failed to commit transaction: %s",
+ PQerrorMessage (db->conn));
+ PQclear (r);
+ }
+ return status;
+} /* c_psql_commit */
+
static c_psql_database_t *c_psql_database_new (const char *name)
{
c_psql_database_t *db;
- db = (c_psql_database_t *)malloc (sizeof (*db));
+ db = (c_psql_database_t *)realloc (databases,
+ (databases_num + 1) * sizeof (*db));
if (NULL == db) {
log_err ("Out of memory.");
return NULL;
}
+ databases = db;
+ db = databases + databases_num;
+ ++databases_num;
+
db->conn = NULL;
C_COMPLAIN_INIT (&db->conn_complaint);
db->queries = NULL;
db->queries_num = 0;
+ db->writers = NULL;
+ db->writers_num = 0;
+
+ pthread_mutex_init (&db->db_lock, /* attrs = */ NULL);
+
db->interval = 0;
+ db->commit_interval = 0;
+ db->next_commit = 0;
+
db->database = sstrdup (name);
db->host = NULL;
db->port = NULL;
db->krbsrvname = NULL;
db->service = NULL;
+
+ db->ref_cnt = 0;
return db;
} /* c_psql_database_new */
c_psql_database_t *db = data;
+ --db->ref_cnt;
+ /* readers and writers may access this database */
+ if (db->ref_cnt > 0)
+ return;
+
+ /* wait for the lock to be released by the last writer */
+ pthread_mutex_lock (&db->db_lock);
+
+ if (db->next_commit > 0)
+ c_psql_commit (db);
+
PQfinish (db->conn);
db->conn = NULL;
sfree (db->queries);
db->queries_num = 0;
+ sfree (db->writers);
+ db->writers_num = 0;
+
+ pthread_mutex_unlock (&db->db_lock);
+
+ pthread_mutex_destroy (&db->db_lock);
+
sfree (db->database);
sfree (db->host);
sfree (db->port);
sfree (db->krbsrvname);
sfree (db->service);
+
+ /* don't care about freeing or reordering the 'databases' array
+ * this is done in 'shutdown' */
return;
} /* c_psql_database_delete */
int buf_len = sizeof (conninfo);
int status;
- if (! db)
+ if ((! db) || (! db->database))
return -1;
status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database);
NULL, NULL, /* return text data */ 0);
} /* c_psql_exec_query_params */
+/* db->db_lock must be locked when calling this function */
static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
udb_query_preparation_area_t *prep_area)
{
return -1;
}
+ /* give c_psql_write() a chance to acquire the lock if called recursively
+ * through dispatch_values(); this will happen if, both, queries and
+ * writers are configured for a single connection */
+ pthread_mutex_unlock (&db->db_lock);
+
column_names = NULL;
column_values = NULL;
-#define BAIL_OUT(status) \
- sfree (column_names); \
- sfree (column_values); \
- PQclear (res); \
- return status
-
if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+ pthread_mutex_lock (&db->db_lock);
+
log_err ("Failed to execute SQL query: %s",
PQerrorMessage (db->conn));
log_info ("SQL query was: %s",
udb_query_get_statement (q));
- BAIL_OUT (-1);
+ PQclear (res);
+ return -1;
}
+#define BAIL_OUT(status) \
+ sfree (column_names); \
+ sfree (column_values); \
+ PQclear (res); \
+ pthread_mutex_lock (&db->db_lock); \
+ return status
+
rows_num = PQntuples (res);
if (1 > rows_num) {
BAIL_OUT (0);
assert (NULL != db->database);
assert (NULL != db->instance);
+ assert (NULL != db->queries);
+
+ pthread_mutex_lock (&db->db_lock);
- if (0 != c_psql_check_connection (db))
+ if (0 != c_psql_check_connection (db)) {
+ pthread_mutex_unlock (&db->db_lock);
return -1;
+ }
for (i = 0; i < db->queries_num; ++i)
{
success = 1;
}
+ pthread_mutex_unlock (&db->db_lock);
+
if (! success)
return -1;
return 0;
} /* c_psql_read */
+static char *values_name_to_sqlarray (const data_set_t *ds,
+ char *string, size_t string_len)
+{
+ char *str_ptr;
+ size_t str_len;
+
+ int i;
+
+ str_ptr = string;
+ str_len = string_len;
+
+ for (i = 0; i < ds->ds_num; ++i) {
+ int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name);
+
+ if (status < 1)
+ return NULL;
+ else if ((size_t)status >= str_len) {
+ str_len = 0;
+ break;
+ }
+ else {
+ str_ptr += status;
+ str_len -= (size_t)status;
+ }
+ }
+
+ if (str_len <= 2) {
+ log_err ("c_psql_write: Failed to stringify value names");
+ return NULL;
+ }
+
+ /* overwrite the first comma */
+ string[0] = '{';
+ str_ptr[0] = '}';
+ str_ptr[1] = '\0';
+
+ return string;
+} /* values_name_to_sqlarray */
+
+static char *values_type_to_sqlarray (const data_set_t *ds,
+ char *string, size_t string_len, _Bool store_rates)
+{
+ char *str_ptr;
+ size_t str_len;
+
+ int i;
+
+ str_ptr = string;
+ str_len = string_len;
+
+ for (i = 0; i < ds->ds_num; ++i) {
+ int status;
+
+ if (store_rates)
+ status = ssnprintf(str_ptr, str_len, ",'gauge'");
+ else
+ status = ssnprintf(str_ptr, str_len, ",'%s'",
+ DS_TYPE_TO_STRING (ds->ds[i].type));
+
+ if (status < 1) {
+ str_len = 0;
+ break;
+ }
+ else if ((size_t)status >= str_len) {
+ str_len = 0;
+ break;
+ }
+ else {
+ str_ptr += status;
+ str_len -= (size_t)status;
+ }
+ }
+
+ if (str_len <= 2) {
+ log_err ("c_psql_write: Failed to stringify value types");
+ return NULL;
+ }
+
+ /* overwrite the first comma */
+ string[0] = '{';
+ str_ptr[0] = '}';
+ str_ptr[1] = '\0';
+
+ return string;
+} /* values_type_to_sqlarray */
+
+static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl,
+ char *string, size_t string_len, _Bool store_rates)
+{
+ char *str_ptr;
+ size_t str_len;
+
+ gauge_t *rates = NULL;
+
+ int i;
+
+ str_ptr = string;
+ str_len = string_len;
+
+ for (i = 0; i < vl->values_len; ++i) {
+ int status = 0;
+
+ if ((ds->ds[i].type != DS_TYPE_GAUGE)
+ && (ds->ds[i].type != DS_TYPE_COUNTER)
+ && (ds->ds[i].type != DS_TYPE_DERIVE)
+ && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
+ log_err ("c_psql_write: Unknown data source type: %i",
+ ds->ds[i].type);
+ sfree (rates);
+ return NULL;
+ }
+
+ if (ds->ds[i].type == DS_TYPE_GAUGE)
+ status = ssnprintf (str_ptr, str_len,
+ ",%f", vl->values[i].gauge);
+ else if (store_rates) {
+ if (rates == NULL)
+ rates = uc_get_rate (ds, vl);
+
+ if (rates == NULL) {
+ log_err ("c_psql_write: Failed to determine rate");
+ return NULL;
+ }
+
+ status = ssnprintf (str_ptr, str_len,
+ ",%lf", rates[i]);
+ }
+ else if (ds->ds[i].type == DS_TYPE_COUNTER)
+ status = ssnprintf (str_ptr, str_len,
+ ",%llu", vl->values[i].counter);
+ else if (ds->ds[i].type == DS_TYPE_DERIVE)
+ status = ssnprintf (str_ptr, str_len,
+ ",%"PRIi64, vl->values[i].derive);
+ else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
+ status = ssnprintf (str_ptr, str_len,
+ ",%"PRIu64, vl->values[i].absolute);
+
+ if (status < 1) {
+ str_len = 0;
+ break;
+ }
+ else if ((size_t)status >= str_len) {
+ str_len = 0;
+ break;
+ }
+ else {
+ str_ptr += status;
+ str_len -= (size_t)status;
+ }
+ }
+
+ sfree (rates);
+
+ if (str_len <= 2) {
+ log_err ("c_psql_write: Failed to stringify value list");
+ return NULL;
+ }
+
+ /* overwrite the first comma */
+ string[0] = '{';
+ str_ptr[0] = '}';
+ str_ptr[1] = '\0';
+
+ return string;
+} /* values_to_sqlarray */
+
+static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
+ user_data_t *ud)
+{
+ c_psql_database_t *db;
+
+ char time_str[32];
+ char values_name_str[1024];
+ char values_type_str[1024];
+ char values_str[1024];
+
+ const char *params[9];
+
+ int success = 0;
+ int i;
+
+ if ((ud == NULL) || (ud->data == NULL)) {
+ log_err ("c_psql_write: Invalid user data.");
+ return -1;
+ }
+
+ db = ud->data;
+ assert (db->database != NULL);
+ assert (db->writers != NULL);
+
+ if (cdtime_to_iso8601 (time_str, sizeof (time_str), vl->time) == 0) {
+ log_err ("c_psql_write: Failed to convert time to ISO 8601 format");
+ return -1;
+ }
+
+ if (values_name_to_sqlarray (ds,
+ values_name_str, sizeof (values_name_str)) == NULL)
+ return -1;
+
+#define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v))
+
+ params[0] = time_str;
+ params[1] = vl->host;
+ params[2] = vl->plugin;
+ params[3] = VALUE_OR_NULL(vl->plugin_instance);
+ params[4] = vl->type;
+ params[5] = VALUE_OR_NULL(vl->type_instance);
+ params[6] = values_name_str;
+
+#undef VALUE_OR_NULL
+
+ pthread_mutex_lock (&db->db_lock);
+
+ if (0 != c_psql_check_connection (db)) {
+ pthread_mutex_unlock (&db->db_lock);
+ return -1;
+ }
+
+ if ((db->commit_interval > 0)
+ && (db->next_commit == 0))
+ c_psql_begin (db);
+
+ for (i = 0; i < db->writers_num; ++i) {
+ c_psql_writer_t *writer;
+ PGresult *res;
+
+ writer = db->writers[i];
+
+ if (values_type_to_sqlarray (ds,
+ values_type_str, sizeof (values_type_str),
+ writer->store_rates) == NULL) {
+ pthread_mutex_unlock (&db->db_lock);
+ return -1;
+ }
+
+ if (values_to_sqlarray (ds, vl,
+ values_str, sizeof (values_str),
+ writer->store_rates) == NULL) {
+ pthread_mutex_unlock (&db->db_lock);
+ return -1;
+ }
+
+ params[7] = values_type_str;
+ params[8] = values_str;
+
+ res = PQexecParams (db->conn, writer->statement,
+ STATIC_ARRAY_SIZE (params), NULL,
+ (const char *const *)params,
+ NULL, NULL, /* return text data */ 0);
+
+ if ((PGRES_COMMAND_OK != PQresultStatus (res))
+ && (PGRES_TUPLES_OK != PQresultStatus (res))) {
+ if ((CONNECTION_OK != PQstatus (db->conn))
+ && (0 == c_psql_check_connection (db))) {
+ PQclear (res);
+
+ /* try again */
+ res = PQexecParams (db->conn, writer->statement,
+ STATIC_ARRAY_SIZE (params), NULL,
+ (const char *const *)params,
+ NULL, NULL, /* return text data */ 0);
+
+ if ((PGRES_COMMAND_OK == PQresultStatus (res))
+ || (PGRES_TUPLES_OK == PQresultStatus (res))) {
+ success = 1;
+ continue;
+ }
+ }
+
+ log_err ("Failed to execute SQL query: %s",
+ PQerrorMessage (db->conn));
+ log_info ("SQL query was: '%s', "
+ "params: %s, %s, %s, %s, %s, %s, %s, %s",
+ writer->statement,
+ params[0], params[1], params[2], params[3],
+ params[4], params[5], params[6], params[7]);
+
+ /* this will abort any current transaction -> restart */
+ if (db->next_commit > 0)
+ c_psql_commit (db);
+
+ pthread_mutex_unlock (&db->db_lock);
+ return -1;
+ }
+ success = 1;
+ }
+
+ if ((db->next_commit > 0)
+ && (cdtime () > db->next_commit))
+ c_psql_commit (db);
+
+ pthread_mutex_unlock (&db->db_lock);
+
+ if (! success)
+ return -1;
+ return 0;
+} /* c_psql_write */
+
+/* We cannot flush single identifiers as all we do is to commit the currently
+ * running transaction, thus making sure that all written data is actually
+ * visible to everybody. */
+static int c_psql_flush (cdtime_t timeout,
+ __attribute__((unused)) const char *ident,
+ user_data_t *ud)
+{
+ c_psql_database_t *dbs = databases;
+ size_t dbs_num = databases_num;
+ size_t i;
+
+ if ((ud != NULL) && (ud->data != NULL)) {
+ dbs = ud->data;
+ dbs_num = 1;
+ }
+
+ for (i = 0; i < dbs_num; ++i) {
+ c_psql_database_t *db = dbs + i;
+
+ /* don't commit if the timeout is larger than the regular commit
+ * interval as in that case all requested data has already been
+ * committed */
+ if ((db->next_commit > 0) && (db->commit_interval > timeout))
+ c_psql_commit (db);
+ }
+ return 0;
+} /* c_psql_flush */
+
static int c_psql_shutdown (void)
{
+ size_t i = 0;
+
+ _Bool had_flush = 0;
+
plugin_unregister_read_group ("postgresql");
+ for (i = 0; i < databases_num; ++i) {
+ c_psql_database_t *db = databases + i;
+
+ if (db->writers_num > 0) {
+ char cb_name[DATA_MAX_NAME_LEN];
+ ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s",
+ db->database);
+
+ if (! had_flush) {
+ plugin_unregister_flush ("postgresql");
+ had_flush = 1;
+ }
+
+ plugin_unregister_flush (cb_name);
+ plugin_unregister_write (cb_name);
+ }
+ }
+
udb_query_free (queries, queries_num);
queries = NULL;
queries_num = 0;
+ sfree (writers);
+ writers = NULL;
+ writers_num = 0;
+
+ sfree (databases);
+ databases = NULL;
+ databases_num = 0;
+
return 0;
} /* c_psql_shutdown */
return (-1);
} /* config_query_callback */
+static int config_add_writer (oconfig_item_t *ci,
+ c_psql_writer_t *src_writers, size_t src_writers_num,
+ c_psql_writer_t ***dst_writers, size_t *dst_writers_num)
+{
+ char *name;
+
+ size_t i;
+
+ if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL))
+ return -1;
+
+ if ((ci->values_num != 1)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+ log_err ("`Writer' expects a single string argument.");
+ return 1;
+ }
+
+ name = ci->values[0].value.string;
+
+ for (i = 0; i < src_writers_num; ++i) {
+ c_psql_writer_t **tmp;
+
+ if (strcasecmp (name, src_writers[i].name) != 0)
+ continue;
+
+ tmp = (c_psql_writer_t **)realloc (*dst_writers,
+ sizeof (**dst_writers) * (*dst_writers_num + 1));
+ if (tmp == NULL) {
+ log_err ("Out of memory.");
+ return -1;
+ }
+
+ tmp[*dst_writers_num] = src_writers + i;
+
+ *dst_writers = tmp;
+ ++(*dst_writers_num);
+ break;
+ }
+
+ if (i >= src_writers_num) {
+ log_err ("No such writer: `%s'", name);
+ return -1;
+ }
+
+ return 0;
+} /* config_add_writer */
+
+static int c_psql_config_writer (oconfig_item_t *ci)
+{
+ c_psql_writer_t *writer;
+ c_psql_writer_t *tmp;
+
+ int status = 0;
+ int i;
+
+ if ((ci->values_num != 1)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+ log_err ("<Writer> expects a single string argument.");
+ return 1;
+ }
+
+ tmp = (c_psql_writer_t *)realloc (writers,
+ sizeof (*writers) * (writers_num + 1));
+ if (tmp == NULL) {
+ log_err ("Out of memory.");
+ return -1;
+ }
+
+ writers = tmp;
+ writer = writers + writers_num;
+ ++writers_num;
+
+ writer->name = sstrdup (ci->values[0].value.string);
+ writer->statement = NULL;
+ writer->store_rates = 1;
+
+ for (i = 0; i < ci->children_num; ++i) {
+ oconfig_item_t *c = ci->children + i;
+
+ if (strcasecmp ("Statement", c->key) == 0)
+ status = cf_util_get_string (c, &writer->statement);
+ else if (strcasecmp ("StoreRates", c->key) == 0)
+ status = cf_util_get_boolean (c, &writer->store_rates);
+ else
+ log_warn ("Ignoring unknown config key \"%s\".", c->key);
+ }
+
+ if (status != 0) {
+ sfree (writer->statement);
+ sfree (writer->name);
+ sfree (writer);
+ return status;
+ }
+
+ return 0;
+} /* c_psql_config_writer */
+
static int c_psql_config_database (oconfig_item_t *ci)
{
c_psql_database_t *db;
struct timespec cb_interval = { 0, 0 };
user_data_t ud;
+ static _Bool have_flush = 0;
+
int i;
if ((1 != ci->values_num)
else if (0 == strcasecmp (c->key, "Query"))
udb_query_pick_from_list (c, queries, queries_num,
&db->queries, &db->queries_num);
+ else if (0 == strcasecmp (c->key, "Writer"))
+ config_add_writer (c, writers, writers_num,
+ &db->writers, &db->writers_num);
else if (0 == strcasecmp (c->key, "Interval"))
cf_util_get_cdtime (c, &db->interval);
+ else if (strcasecmp ("CommitInterval", c->key) == 0)
+ cf_util_get_cdtime (c, &db->commit_interval);
else
log_warn ("Ignoring unknown config key \"%s\".", c->key);
}
/* If no `Query' options were given, add the default queries.. */
- if (db->queries_num == 0) {
+ if ((db->queries_num == 0) && (db->writers_num == 0)){
for (i = 0; i < def_queries_num; i++)
udb_query_pick_from_list_by_name (def_queries[i],
queries, queries_num,
ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance);
- CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
+ if (db->queries_num > 0) {
+ CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
- plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
- /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
- &ud);
+ ++db->ref_cnt;
+ plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
+ /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
+ &ud);
+ }
+ if (db->writers_num > 0) {
+ ++db->ref_cnt;
+ plugin_register_write (cb_name, c_psql_write, &ud);
+
+ if (! have_flush) {
+ /* flush all */
+ plugin_register_flush ("postgresql",
+ c_psql_flush, /* user data = */ NULL);
+ have_flush = 1;
+ }
+
+ /* flush this connection only */
+ ++db->ref_cnt;
+ plugin_register_flush (cb_name, c_psql_flush, &ud);
+ }
+ else if (db->commit_interval > 0) {
+ log_warn ("Database '%s': You do not have any writers assigned to "
+ "this database connection. Setting 'CommitInterval' does "
+ "not have any effect.", db->database);
+ }
return 0;
} /* c_psql_config_database */
if (0 == strcasecmp (c->key, "Query"))
udb_query_create (&queries, &queries_num, c,
/* callback = */ config_query_callback);
+ else if (0 == strcasecmp (c->key, "Writer"))
+ c_psql_config_writer (c);
else if (0 == strcasecmp (c->key, "Database"))
c_psql_config_database (c);
else