Merge branch 'sh/postgresql-writer'
authorFlorian Forster <octo@collectd.org>
Sat, 17 Nov 2012 08:39:33 +0000 (09:39 +0100)
committerFlorian Forster <octo@collectd.org>
Sat, 17 Nov 2012 08:39:33 +0000 (09:39 +0100)
Conflicts:
src/collectd.conf.pod
src/postgresql.c

contrib/postgresql/collectd_insert.sql [new file with mode: 0644]
src/collectd.conf.pod
src/postgresql.c
src/utils_time.c
src/utils_time.h

diff --git a/contrib/postgresql/collectd_insert.sql b/contrib/postgresql/collectd_insert.sql
new file mode 100644 (file)
index 0000000..fb44bb4
--- /dev/null
@@ -0,0 +1,234 @@
+-- collectd - contrib/postgresql/collectd_insert.sql
+-- Copyright (C) 2012 Sebastian 'tokkee' Harl
+-- All rights reserved.
+--
+-- Redistribution and use in source and binary forms, with or without
+-- modification, are permitted provided that the following conditions
+-- are met:
+--
+-- - Redistributions of source code must retain the above copyright
+--   notice, this list of conditions and the following disclaimer.
+--
+-- - Redistributions in binary form must reproduce the above copyright
+--   notice, this list of conditions and the following disclaimer in the
+--   documentation and/or other materials provided with the distribution.
+--
+-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+-- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+-- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+-- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+-- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+-- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+-- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+-- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+-- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+-- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+-- POSSIBILITY OF SUCH DAMAGE.
+
+-- Description:
+-- ------------
+--
+-- This is a sample database setup that may be used to write data collected by
+-- collectd to a PostgreSQL database. We're using two tables, 'identifiers'
+-- and 'values' to store the value-list identifier and the actual values
+-- respectively.
+--
+-- The 'values' table is partitioned to improve performance and maintainance.
+-- Please note that additional maintainance scripts are required in order to
+-- keep the setup running -- see the comments below for details.
+--
+-- The function 'collectd_insert' may be used to actually insert values
+-- submitted by collectd into those tables.
+--
+-- Sample configuration:
+-- ---------------------
+--
+-- <Plugin postgresql>
+--     <Writer sqlstore>
+--         Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+--     </Writer>
+--     <Database foo>
+--         # ...
+--         Writer sqlstore
+--     </Database>
+-- </Plugin>
+
+CREATE TABLE identifiers (
+    id integer NOT NULL,
+    host character varying(64) NOT NULL,
+    plugin character varying(64) NOT NULL,
+    plugin_inst character varying(64) DEFAULT NULL::character varying,
+    type character varying(64) NOT NULL,
+    type_inst character varying(64) DEFAULT NULL::character varying
+);
+CREATE SEQUENCE identifiers_id_seq
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+ALTER SEQUENCE identifiers_id_seq OWNED BY identifiers.id;
+ALTER TABLE ONLY identifiers
+    ALTER COLUMN id SET DEFAULT nextval('identifiers_id_seq'::regclass);
+ALTER TABLE ONLY identifiers
+    ADD CONSTRAINT identifiers_host_plugin_plugin_inst_type_type_inst_key
+        UNIQUE (host, plugin, plugin_inst, type, type_inst);
+ALTER TABLE ONLY identifiers
+    ADD CONSTRAINT identifiers_pkey PRIMARY KEY (id);
+
+-- optionally, create indexes for the identifier fields
+CREATE INDEX identifiers_host ON identifiers USING btree (host);
+CREATE INDEX identifiers_plugin ON identifiers USING btree (plugin);
+CREATE INDEX identifiers_plugin_inst ON identifiers USING btree (plugin_inst);
+CREATE INDEX identifiers_type ON identifiers USING btree (type);
+CREATE INDEX identifiers_type_inst ON identifiers USING btree (type_inst);
+
+CREATE TABLE "values" (
+    id integer NOT NULL,
+    tstamp timestamp with time zone NOT NULL,
+    name character varying(64) NOT NULL,
+    value double precision NOT NULL
+);
+
+CREATE OR REPLACE VIEW collectd
+    AS SELECT host, plugin, plugin_inst, type, type_inst,
+            host
+                || '/' || plugin
+                || CASE
+                    WHEN plugin_inst IS NOT NULL THEN '-'
+                    ELSE ''
+                END
+                || coalesce(plugin_inst, '')
+                || '/' || type
+                || CASE
+                    WHEN type_inst IS NOT NULL THEN '-'
+                    ELSE ''
+                END
+                || coalesce(plugin_inst, '') AS identifier,
+            tstamp, name, value
+        FROM identifiers
+            JOIN values
+            ON values.id = identifiers.id;
+
+-- partition "values" by day (or week, month, ...)
+
+-- create the child tables for today and the next 'days' days:
+-- this may, for example, be used in a daily cron-job (or similar) to create
+-- the tables for the next couple of days
+CREATE OR REPLACE FUNCTION values_update_childs(
+        integer
+    ) RETURNS SETOF text
+    LANGUAGE plpgsql
+    AS $_$
+DECLARE
+    days alias for $1;
+    cur_day date;
+    next_day date;
+    i integer;
+BEGIN
+    IF days < 1 THEN
+        RAISE EXCEPTION 'Cannot have negative number of days';
+    END IF;
+
+    i := 0;
+    LOOP
+        EXIT WHEN i > days;
+
+        SELECT CAST ('now'::date + i * '1day'::interval AS date) INTO cur_day;
+        SELECT CAST ('now'::date + (i + 1) * '1day'::interval AS date) INTO next_day;
+
+        i := i + 1;
+
+        BEGIN
+            EXECUTE 'CREATE TABLE "values$' || cur_day || '" (
+                CHECK (tstamp >= TIMESTAMP ''' || cur_day || ''' '
+                    || 'AND tstamp < TIMESTAMP ''' || next_day || ''')
+            ) INHERITS (values)';
+        EXCEPTION WHEN duplicate_table THEN
+            CONTINUE;
+        END;
+
+        RETURN NEXT 'values$' || cur_day::text;
+
+        EXECUTE 'ALTER TABLE ONLY "values$' || cur_day || '"
+            ADD CONSTRAINT "values_' || cur_day || '_pkey"
+                PRIMARY KEY (id, tstamp, name, value)';
+        EXECUTE 'ALTER TABLE ONLY "values$' || cur_day || '"
+            ADD CONSTRAINT "values_' || cur_day || '_id_fkey"
+                FOREIGN KEY (id) REFERENCES identifiers(id)';
+    END LOOP;
+    RETURN;
+END;
+$_$;
+
+-- create initial child tables
+SELECT values_update_childs(2);
+
+CREATE OR REPLACE FUNCTION values_insert_trigger()
+    RETURNS trigger
+    LANGUAGE plpgsql
+    AS $_$
+DECLARE
+    child_tbl character varying;
+BEGIN
+    SELECT 'values$' || CAST (NEW.tstamp AS DATE) INTO child_tbl;
+    -- Rather than using 'EXECUTE', some if-cascade checking the date may also
+    -- be used. However, this would require frequent updates of the trigger
+    -- function while this example works automatically.
+    EXECUTE 'INSERT INTO "' || child_tbl || '" VALUES ($1.*)' USING NEW;
+    RETURN NULL;
+END;
+$_$;
+
+CREATE TRIGGER insert_values_trigger
+    BEFORE INSERT ON values
+    FOR EACH ROW EXECUTE PROCEDURE values_insert_trigger();
+
+-- when querying values make sure to enable constraint exclusion
+-- SET constraint_exclusion = on;
+
+CREATE OR REPLACE FUNCTION collectd_insert(
+        timestamp with time zone, character varying,
+        character varying, character varying,
+        character varying, character varying,
+        character varying[], character varying[], double precision[]
+    ) RETURNS void
+    LANGUAGE plpgsql
+    AS $_$
+DECLARE
+    p_time alias for $1;
+    p_host alias for $2;
+    p_plugin alias for $3;
+    p_plugin_instance alias for $4;
+    p_type alias for $5;
+    p_type_instance alias for $6;
+    p_value_names alias for $7;
+    -- don't use the type info; for 'StoreRates true' it's 'gauge' anyway
+    -- p_type_names alias for $8;
+    p_values alias for $9;
+    ds_id integer;
+    i integer;
+BEGIN
+    SELECT id INTO ds_id
+        FROM identifiers
+        WHERE host = p_host
+            AND plugin = p_plugin
+            AND COALESCE(plugin_inst, '') = COALESCE(p_plugin_instance, '')
+            AND type = p_type
+            AND COALESCE(type_inst, '') = COALESCE(p_type_instance, '');
+    IF NOT FOUND THEN
+        INSERT INTO identifiers (host, plugin, plugin_inst, type, type_inst)
+            VALUES (p_host, p_plugin, p_plugin_instance, p_type, p_type_instance)
+            RETURNING id INTO ds_id;
+    END IF;
+    i := 1;
+    LOOP
+        EXIT WHEN i > array_upper(p_value_names, 1);
+        INSERT INTO values (id, tstamp, name, value)
+            VALUES (ds_id, p_time, p_value_names[i], p_values[i]);
+        i := i + 1;
+    END LOOP;
+END;
+$_$;
+
+-- vim: set expandtab :
index 9d098f1..417af0d 100644 (file)
@@ -3627,6 +3627,13 @@ which are available in a PostgreSQL database or use future or special
 statistics provided by PostgreSQL without the need to upgrade your collectd
 installation.
 
+Starting with version 5.2, the C<postgresql> plugin supports writing data to
+PostgreSQL databases as well. This has been implemented in a generic way. You
+need to specify an SQL statement which will then be executed by collectd in
+order to write the data (see below for details). The benefit of that approach
+is that there is no fixed database layout. Rather, the layout may be optimized
+for the current setup.
+
 The B<PostgreSQL Documentation> manual can be found at
 L<http://www.postgresql.org/docs/manuals/>.
 
@@ -3656,6 +3663,11 @@ L<http://www.postgresql.org/docs/manuals/>.
       </Result>
     </Query>
 
+    <Writer sqlstore>
+      Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+      StoreRates true
+    </Writer>
+
     <Database foo>
       Host "hostname"
       Port "5432"
@@ -3672,6 +3684,12 @@ L<http://www.postgresql.org/docs/manuals/>.
       Query backend # predefined
       Query rt36_tickets
     </Database>
+
+    <Database qux>
+      # ...
+      Writer sqlstore
+      CommitInterval 10
+    </Database>
   </Plugin>
 
 The B<Query> block defines one database query which may later be used by a
@@ -3855,6 +3873,84 @@ non-by_table queries above.
 
 =back
 
+The B<Writer> block defines a PostgreSQL writer backend. It accepts a single
+mandatory argument specifying the name of the writer. This will then be used
+in the B<Database> specification in order to activate the writer instance. The
+names of all writers have to be unique. The following options may be
+specified:
+
+=over 4
+
+=item B<Statement> I<sql statement>
+
+This mandatory option specifies the SQL statement that will be executed for
+each submitted value. A single SQL statement is allowed only. Anything after
+the first semicolon will be ignored.
+
+Nine parameters will be passed to the statement and should be specified as
+tokens B<$1>, B<$2>, through B<$9> in the statement string. The following
+values are made available through those parameters:
+
+=over 4
+
+=item B<$1>
+
+The timestamp of the queried value as a floating point number.
+
+=item B<$2>
+
+The hostname of the queried value.
+
+=item B<$3>
+
+The plugin name of the queried value.
+
+=item B<$4>
+
+The plugin instance of the queried value. This value may be B<NULL> if there
+is no plugin instance.
+
+=item B<$5>
+
+The type of the queried value (cf. L<types.db(5)>).
+
+=item B<$6>
+
+The type instance of the queried value. This value may be B<NULL> if there is
+no type instance.
+
+=item B<$7>
+
+An array of names for the submitted values (i.E<nbsp>e., the name of the data
+sources of the submitted value-list).
+
+=item B<$8>
+
+An array of types for the submitted values (i.E<nbsp>e., the type of the data
+sources of the submitted value-list; C<counter>, C<gauge>, ...). Note, that if
+B<StoreRates> is enabled (which is the default, see below), all types will be
+C<gauge>.
+
+=item B<$9>
+
+An array of the submitted values. The dimensions of the value name and value
+arrays match.
+
+=back
+
+In general, it is advisable to create and call a custom function in the
+PostgreSQL database for this purpose. Any procedural language supported by
+PostgreSQL will do (see chapter "Server Programming" in the PostgreSQL manual
+for details).
+
+=item B<StoreRates> B<false>|B<true>
+
+If set to B<true> (the default), convert counter values to rates. If set to
+B<false> counter values are stored as is, i.E<nbsp>e. as an increasing integer
+number.
+
+=back
+
 The B<Database> block defines one PostgreSQL database for which to collect
 statistics. It accepts a single mandatory argument which specifies the
 database name. None of the other options are required. PostgreSQL will use
@@ -3870,6 +3966,17 @@ for details.
 Specify the interval with which the database should be queried. The default is
 to use the global B<Interval> setting.
 
+=item B<CommitInterval> I<seconds>
+
+This option may be used for database connections which have "writers" assigned
+(see above). If specified, it causes a writer to put several updates into a
+single transaction. This transaction will last for the specified amount of
+time. By default, each update will be executed in a separate transaction. Each
+transaction generates a fair amount of overhead which can, thus, be reduced by
+activating this option. The draw-back is, that data covering the specified
+amount of time will be lost, for example, if a single statement within the
+transaction fails or if the database server crashes.
+
 =item B<Host> I<hostname>
 
 Specify the hostname or IP of the PostgreSQL server to connect to. If the
@@ -3942,11 +4049,36 @@ B<PostgreSQL Documentation> for details.
 
 =item B<Query> I<query>
 
-Specify a I<query> which should be executed for the database connection. This
-may be any of the predefined or user-defined queries. If no such option is
-given, it defaults to "backends", "transactions", "queries", "query_plans",
-"table_states", "disk_io" and "disk_usage". Else, the specified queries are
-used only.
+Specifies a I<query> which should be executed in the context of the database
+connection. This may be any of the predefined or user-defined queries. If no
+such option is given, it defaults to "backends", "transactions", "queries",
+"query_plans", "table_states", "disk_io" and "disk_usage" (unless a B<Writer>
+has been specified). Else, the specified queries are used only.
+
+=item B<Writer> I<writer>
+
+Assigns the specified I<writer> backend to the database connection. This
+causes all collected data to be send to the database using the settings
+defined in the writer configuration (see the section "FILTER CONFIGURATION"
+below for details on how to selectively send data to certain plugins).
+
+Each writer will register a flush callback which may be used when having long
+transactions enabled (see the B<CommitInterval> option above). When issuing
+the B<FLUSH> command (see L<collectd-unixsock(5)> for details) the current
+transaction will be committed right away. Two different kinds of flush
+callbacks are available with the C<postgresql> plugin:
+
+=over 4
+
+=item B<postgresql>
+
+Flush all writer backends.
+
+=item B<postgresql->I<database>
+
+Flush all writers of the specified I<database> only.
+
+=back
 
 =back
 
index a72109a..15d4666 100644 (file)
@@ -1,7 +1,7 @@
 /**
  * collectd - src/postgresql.c
- * Copyright (C) 2008, 2009  Sebastian Harl
- * Copyright (C) 2009        Florian Forster
+ * Copyright (C) 2008-2012  Sebastian Harl
+ * Copyright (C) 2009       Florian Forster
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 #include "configfile.h"
 #include "plugin.h"
 
+#include "utils_cache.h"
 #include "utils_db_query.h"
 #include "utils_complain.h"
 
+#if HAVE_PTHREAD_H
+# include <pthread.h>
+#endif
+
 #include <pg_config_manual.h>
 #include <libpq-fe.h>
 
 #define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
 #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
 #define log_info(...) INFO ("postgresql: " __VA_ARGS__)
+#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__)
 
 #ifndef C_PSQL_DEFAULT_CONF
 # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
@@ -105,6 +111,12 @@ typedef struct {
 } c_psql_user_data_t;
 
 typedef struct {
+       char *name;
+       char *statement;
+       _Bool store_rates;
+} c_psql_writer_t;
+
+typedef struct {
        PGconn      *conn;
        c_complain_t conn_complaint;
 
@@ -118,8 +130,18 @@ typedef struct {
        udb_query_t    **queries;
        size_t           queries_num;
 
+       c_psql_writer_t **writers;
+       size_t            writers_num;
+
+       /* make sure we don't access the database object in parallel */
+       pthread_mutex_t   db_lock;
+
        cdtime_t interval;
 
+       /* writer "caching" settings */
+       cdtime_t commit_interval;
+       cdtime_t next_commit;
+
        char *host;
        char *port;
        char *database;
@@ -133,6 +155,8 @@ typedef struct {
        char *krbsrvname;
 
        char *service;
+
+       int ref_cnt;
 } c_psql_database_t;
 
 static char *def_queries[] = {
@@ -146,19 +170,69 @@ static char *def_queries[] = {
 };
 static int def_queries_num = STATIC_ARRAY_SIZE (def_queries);
 
+static c_psql_database_t *databases     = NULL;
+static size_t             databases_num = 0;
+
 static udb_query_t      **queries       = NULL;
 static size_t             queries_num   = 0;
 
+static c_psql_writer_t   *writers       = NULL;
+static size_t             writers_num   = 0;
+
+static int c_psql_begin (c_psql_database_t *db)
+{
+       PGresult *r = PQexec (db->conn, "BEGIN");
+
+       int status = 1;
+
+       if (r != NULL) {
+               if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+                       db->next_commit = cdtime() + db->commit_interval;
+                       status = 0;
+               }
+               else
+                       log_warn ("Failed to initiate ('BEGIN') transaction: %s",
+                                       PQerrorMessage (db->conn));
+               PQclear (r);
+       }
+       return status;
+} /* c_psql_begin */
+
+static int c_psql_commit (c_psql_database_t *db)
+{
+       PGresult *r = PQexec (db->conn, "COMMIT");
+
+       int status = 1;
+
+       if (r != NULL) {
+               if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+                       db->next_commit = 0;
+                       log_debug ("Successfully committed transaction.");
+                       status = 0;
+               }
+               else
+                       log_warn ("Failed to commit transaction: %s",
+                                       PQerrorMessage (db->conn));
+               PQclear (r);
+       }
+       return status;
+} /* c_psql_commit */
+
 static c_psql_database_t *c_psql_database_new (const char *name)
 {
        c_psql_database_t *db;
 
-       db = (c_psql_database_t *)malloc (sizeof (*db));
+       db = (c_psql_database_t *)realloc (databases,
+                       (databases_num + 1) * sizeof (*db));
        if (NULL == db) {
                log_err ("Out of memory.");
                return NULL;
        }
 
+       databases = db;
+       db = databases + databases_num;
+       ++databases_num;
+
        db->conn = NULL;
 
        C_COMPLAIN_INIT (&db->conn_complaint);
@@ -172,8 +246,16 @@ static c_psql_database_t *c_psql_database_new (const char *name)
        db->queries        = NULL;
        db->queries_num    = 0;
 
+       db->writers        = NULL;
+       db->writers_num    = 0;
+
+       pthread_mutex_init (&db->db_lock, /* attrs = */ NULL);
+
        db->interval   = 0;
 
+       db->commit_interval = 0;
+       db->next_commit     = 0;
+
        db->database   = sstrdup (name);
        db->host       = NULL;
        db->port       = NULL;
@@ -187,6 +269,8 @@ static c_psql_database_t *c_psql_database_new (const char *name)
        db->krbsrvname = NULL;
 
        db->service    = NULL;
+
+       db->ref_cnt    = 0;
        return db;
 } /* c_psql_database_new */
 
@@ -196,6 +280,17 @@ static void c_psql_database_delete (void *data)
 
        c_psql_database_t *db = data;
 
+       --db->ref_cnt;
+       /* readers and writers may access this database */
+       if (db->ref_cnt > 0)
+               return;
+
+       /* wait for the lock to be released by the last writer */
+       pthread_mutex_lock (&db->db_lock);
+
+       if (db->next_commit > 0)
+               c_psql_commit (db);
+
        PQfinish (db->conn);
        db->conn = NULL;
 
@@ -207,6 +302,13 @@ static void c_psql_database_delete (void *data)
        sfree (db->queries);
        db->queries_num = 0;
 
+       sfree (db->writers);
+       db->writers_num = 0;
+
+       pthread_mutex_unlock (&db->db_lock);
+
+       pthread_mutex_destroy (&db->db_lock);
+
        sfree (db->database);
        sfree (db->host);
        sfree (db->port);
@@ -220,6 +322,9 @@ static void c_psql_database_delete (void *data)
        sfree (db->krbsrvname);
 
        sfree (db->service);
+
+       /* don't care about freeing or reordering the 'databases' array
+        * this is done in 'shutdown' */
        return;
 } /* c_psql_database_delete */
 
@@ -230,7 +335,7 @@ static int c_psql_connect (c_psql_database_t *db)
        int   buf_len = sizeof (conninfo);
        int   status;
 
-       if (! db)
+       if ((! db) || (! db->database))
                return -1;
 
        status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database);
@@ -363,6 +468,7 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db,
                        NULL, NULL, /* return text data */ 0);
 } /* c_psql_exec_query_params */
 
+/* db->db_lock must be locked when calling this function */
 static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
                udb_query_preparation_area_t *prep_area)
 {
@@ -397,23 +503,32 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
                return -1;
        }
 
+       /* give c_psql_write() a chance to acquire the lock if called recursively
+        * through dispatch_values(); this will happen if, both, queries and
+        * writers are configured for a single connection */
+       pthread_mutex_unlock (&db->db_lock);
+
        column_names = NULL;
        column_values = NULL;
 
-#define BAIL_OUT(status) \
-       sfree (column_names); \
-       sfree (column_values); \
-       PQclear (res); \
-       return status
-
        if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+               pthread_mutex_lock (&db->db_lock);
+
                log_err ("Failed to execute SQL query: %s",
                                PQerrorMessage (db->conn));
                log_info ("SQL query was: %s",
                                udb_query_get_statement (q));
-               BAIL_OUT (-1);
+               PQclear (res);
+               return -1;
        }
 
+#define BAIL_OUT(status) \
+       sfree (column_names); \
+       sfree (column_values); \
+       PQclear (res); \
+       pthread_mutex_lock (&db->db_lock); \
+       return status
+
        rows_num = PQntuples (res);
        if (1 > rows_num) {
                BAIL_OUT (0);
@@ -501,9 +616,14 @@ static int c_psql_read (user_data_t *ud)
 
        assert (NULL != db->database);
        assert (NULL != db->instance);
+       assert (NULL != db->queries);
+
+       pthread_mutex_lock (&db->db_lock);
 
-       if (0 != c_psql_check_connection (db))
+       if (0 != c_psql_check_connection (db)) {
+               pthread_mutex_unlock (&db->db_lock);
                return -1;
+       }
 
        for (i = 0; i < db->queries_num; ++i)
        {
@@ -521,19 +641,377 @@ static int c_psql_read (user_data_t *ud)
                        success = 1;
        }
 
+       pthread_mutex_unlock (&db->db_lock);
+
        if (! success)
                return -1;
        return 0;
 } /* c_psql_read */
 
+static char *values_name_to_sqlarray (const data_set_t *ds,
+               char *string, size_t string_len)
+{
+       char  *str_ptr;
+       size_t str_len;
+
+       int i;
+
+       str_ptr = string;
+       str_len = string_len;
+
+       for (i = 0; i < ds->ds_num; ++i) {
+               int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name);
+
+               if (status < 1)
+                       return NULL;
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value names");
+               return NULL;
+       }
+
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+
+       return string;
+} /* values_name_to_sqlarray */
+
+static char *values_type_to_sqlarray (const data_set_t *ds,
+               char *string, size_t string_len, _Bool store_rates)
+{
+       char  *str_ptr;
+       size_t str_len;
+
+       int i;
+
+       str_ptr = string;
+       str_len = string_len;
+
+       for (i = 0; i < ds->ds_num; ++i) {
+               int status;
+
+               if (store_rates)
+                       status = ssnprintf(str_ptr, str_len, ",'gauge'");
+               else
+                       status = ssnprintf(str_ptr, str_len, ",'%s'",
+                                       DS_TYPE_TO_STRING (ds->ds[i].type));
+
+               if (status < 1) {
+                       str_len = 0;
+                       break;
+               }
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value types");
+               return NULL;
+       }
+
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+
+       return string;
+} /* values_type_to_sqlarray */
+
+static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl,
+               char *string, size_t string_len, _Bool store_rates)
+{
+       char  *str_ptr;
+       size_t str_len;
+
+       gauge_t *rates = NULL;
+
+       int i;
+
+       str_ptr = string;
+       str_len = string_len;
+
+       for (i = 0; i < vl->values_len; ++i) {
+               int status = 0;
+
+               if ((ds->ds[i].type != DS_TYPE_GAUGE)
+                               && (ds->ds[i].type != DS_TYPE_COUNTER)
+                               && (ds->ds[i].type != DS_TYPE_DERIVE)
+                               && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
+                       log_err ("c_psql_write: Unknown data source type: %i",
+                                       ds->ds[i].type);
+                       sfree (rates);
+                       return NULL;
+               }
+
+               if (ds->ds[i].type == DS_TYPE_GAUGE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%f", vl->values[i].gauge);
+               else if (store_rates) {
+                       if (rates == NULL)
+                               rates = uc_get_rate (ds, vl);
+
+                       if (rates == NULL) {
+                               log_err ("c_psql_write: Failed to determine rate");
+                               return NULL;
+                       }
+
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%lf", rates[i]);
+               }
+               else if (ds->ds[i].type == DS_TYPE_COUNTER)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%llu", vl->values[i].counter);
+               else if (ds->ds[i].type == DS_TYPE_DERIVE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%"PRIi64, vl->values[i].derive);
+               else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%"PRIu64, vl->values[i].absolute);
+
+               if (status < 1) {
+                       str_len = 0;
+                       break;
+               }
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+
+       sfree (rates);
+
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value list");
+               return NULL;
+       }
+
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+
+       return string;
+} /* values_to_sqlarray */
+
+static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
+               user_data_t *ud)
+{
+       c_psql_database_t *db;
+
+       char time_str[32];
+       char values_name_str[1024];
+       char values_type_str[1024];
+       char values_str[1024];
+
+       const char *params[9];
+
+       int success = 0;
+       int i;
+
+       if ((ud == NULL) || (ud->data == NULL)) {
+               log_err ("c_psql_write: Invalid user data.");
+               return -1;
+       }
+
+       db = ud->data;
+       assert (db->database != NULL);
+       assert (db->writers != NULL);
+
+       if (cdtime_to_iso8601 (time_str, sizeof (time_str), vl->time) == 0) {
+               log_err ("c_psql_write: Failed to convert time to ISO 8601 format");
+               return -1;
+       }
+
+       if (values_name_to_sqlarray (ds,
+                               values_name_str, sizeof (values_name_str)) == NULL)
+               return -1;
+
+#define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v))
+
+       params[0] = time_str;
+       params[1] = vl->host;
+       params[2] = vl->plugin;
+       params[3] = VALUE_OR_NULL(vl->plugin_instance);
+       params[4] = vl->type;
+       params[5] = VALUE_OR_NULL(vl->type_instance);
+       params[6] = values_name_str;
+
+#undef VALUE_OR_NULL
+
+       pthread_mutex_lock (&db->db_lock);
+
+       if (0 != c_psql_check_connection (db)) {
+               pthread_mutex_unlock (&db->db_lock);
+               return -1;
+       }
+
+       if ((db->commit_interval > 0)
+                       && (db->next_commit == 0))
+               c_psql_begin (db);
+
+       for (i = 0; i < db->writers_num; ++i) {
+               c_psql_writer_t *writer;
+               PGresult *res;
+
+               writer = db->writers[i];
+
+               if (values_type_to_sqlarray (ds,
+                                       values_type_str, sizeof (values_type_str),
+                                       writer->store_rates) == NULL) {
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+
+               if (values_to_sqlarray (ds, vl,
+                                       values_str, sizeof (values_str),
+                                       writer->store_rates) == NULL) {
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+
+               params[7] = values_type_str;
+               params[8] = values_str;
+
+               res = PQexecParams (db->conn, writer->statement,
+                               STATIC_ARRAY_SIZE (params), NULL,
+                               (const char *const *)params,
+                               NULL, NULL, /* return text data */ 0);
+
+               if ((PGRES_COMMAND_OK != PQresultStatus (res))
+                               && (PGRES_TUPLES_OK != PQresultStatus (res))) {
+                       if ((CONNECTION_OK != PQstatus (db->conn))
+                                       && (0 == c_psql_check_connection (db))) {
+                               PQclear (res);
+
+                               /* try again */
+                               res = PQexecParams (db->conn, writer->statement,
+                                               STATIC_ARRAY_SIZE (params), NULL,
+                                               (const char *const *)params,
+                                               NULL, NULL, /* return text data */ 0);
+
+                               if ((PGRES_COMMAND_OK == PQresultStatus (res))
+                                               || (PGRES_TUPLES_OK == PQresultStatus (res))) {
+                                       success = 1;
+                                       continue;
+                               }
+                       }
+
+                       log_err ("Failed to execute SQL query: %s",
+                                       PQerrorMessage (db->conn));
+                       log_info ("SQL query was: '%s', "
+                                       "params: %s, %s, %s, %s, %s, %s, %s, %s",
+                                       writer->statement,
+                                       params[0], params[1], params[2], params[3],
+                                       params[4], params[5], params[6], params[7]);
+
+                       /* this will abort any current transaction -> restart */
+                       if (db->next_commit > 0)
+                               c_psql_commit (db);
+
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+               success = 1;
+       }
+
+       if ((db->next_commit > 0)
+                       && (cdtime () > db->next_commit))
+               c_psql_commit (db);
+
+       pthread_mutex_unlock (&db->db_lock);
+
+       if (! success)
+               return -1;
+       return 0;
+} /* c_psql_write */
+
+/* We cannot flush single identifiers as all we do is to commit the currently
+ * running transaction, thus making sure that all written data is actually
+ * visible to everybody. */
+static int c_psql_flush (cdtime_t timeout,
+               __attribute__((unused)) const char *ident,
+               user_data_t *ud)
+{
+       c_psql_database_t *dbs = databases;
+       size_t dbs_num = databases_num;
+       size_t i;
+
+       if ((ud != NULL) && (ud->data != NULL)) {
+               dbs = ud->data;
+               dbs_num = 1;
+       }
+
+       for (i = 0; i < dbs_num; ++i) {
+               c_psql_database_t *db = dbs + i;
+
+               /* don't commit if the timeout is larger than the regular commit
+                * interval as in that case all requested data has already been
+                * committed */
+               if ((db->next_commit > 0) && (db->commit_interval > timeout))
+                       c_psql_commit (db);
+       }
+       return 0;
+} /* c_psql_flush */
+
 static int c_psql_shutdown (void)
 {
+       size_t i = 0;
+
+       _Bool had_flush = 0;
+
        plugin_unregister_read_group ("postgresql");
 
+       for (i = 0; i < databases_num; ++i) {
+               c_psql_database_t *db = databases + i;
+
+               if (db->writers_num > 0) {
+                       char cb_name[DATA_MAX_NAME_LEN];
+                       ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s",
+                                       db->database);
+
+                       if (! had_flush) {
+                               plugin_unregister_flush ("postgresql");
+                               had_flush = 1;
+                       }
+
+                       plugin_unregister_flush (cb_name);
+                       plugin_unregister_write (cb_name);
+               }
+       }
+
        udb_query_free (queries, queries_num);
        queries = NULL;
        queries_num = 0;
 
+       sfree (writers);
+       writers = NULL;
+       writers_num = 0;
+
+       sfree (databases);
+       databases = NULL;
+       databases_num = 0;
+
        return 0;
 } /* c_psql_shutdown */
 
@@ -595,6 +1073,103 @@ static int config_query_callback (udb_query_t *q, oconfig_item_t *ci)
        return (-1);
 } /* config_query_callback */
 
+static int config_add_writer (oconfig_item_t *ci,
+               c_psql_writer_t *src_writers, size_t src_writers_num,
+               c_psql_writer_t ***dst_writers, size_t *dst_writers_num)
+{
+       char *name;
+
+       size_t i;
+
+       if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL))
+               return -1;
+
+       if ((ci->values_num != 1)
+                       || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+               log_err ("`Writer' expects a single string argument.");
+               return 1;
+       }
+
+       name = ci->values[0].value.string;
+
+       for (i = 0; i < src_writers_num; ++i) {
+               c_psql_writer_t **tmp;
+
+               if (strcasecmp (name, src_writers[i].name) != 0)
+                       continue;
+
+               tmp = (c_psql_writer_t **)realloc (*dst_writers,
+                               sizeof (**dst_writers) * (*dst_writers_num + 1));
+               if (tmp == NULL) {
+                       log_err ("Out of memory.");
+                       return -1;
+               }
+
+               tmp[*dst_writers_num] = src_writers + i;
+
+               *dst_writers = tmp;
+               ++(*dst_writers_num);
+               break;
+       }
+
+       if (i >= src_writers_num) {
+               log_err ("No such writer: `%s'", name);
+               return -1;
+       }
+
+       return 0;
+} /* config_add_writer */
+
+static int c_psql_config_writer (oconfig_item_t *ci)
+{
+       c_psql_writer_t *writer;
+       c_psql_writer_t *tmp;
+
+       int status = 0;
+       int i;
+
+       if ((ci->values_num != 1)
+                       || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+               log_err ("<Writer> expects a single string argument.");
+               return 1;
+       }
+
+       tmp = (c_psql_writer_t *)realloc (writers,
+                       sizeof (*writers) * (writers_num + 1));
+       if (tmp == NULL) {
+               log_err ("Out of memory.");
+               return -1;
+       }
+
+       writers = tmp;
+       writer  = writers + writers_num;
+       ++writers_num;
+
+       writer->name = sstrdup (ci->values[0].value.string);
+       writer->statement = NULL;
+       writer->store_rates = 1;
+
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *c = ci->children + i;
+
+               if (strcasecmp ("Statement", c->key) == 0)
+                       status = cf_util_get_string (c, &writer->statement);
+               else if (strcasecmp ("StoreRates", c->key) == 0)
+                       status = cf_util_get_boolean (c, &writer->store_rates);
+               else
+                       log_warn ("Ignoring unknown config key \"%s\".", c->key);
+       }
+
+       if (status != 0) {
+               sfree (writer->statement);
+               sfree (writer->name);
+               sfree (writer);
+               return status;
+       }
+
+       return 0;
+} /* c_psql_config_writer */
+
 static int c_psql_config_database (oconfig_item_t *ci)
 {
        c_psql_database_t *db;
@@ -603,6 +1178,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
        struct timespec cb_interval = { 0, 0 };
        user_data_t ud;
 
+       static _Bool have_flush = 0;
+
        int i;
 
        if ((1 != ci->values_num)
@@ -639,14 +1216,19 @@ static int c_psql_config_database (oconfig_item_t *ci)
                else if (0 == strcasecmp (c->key, "Query"))
                        udb_query_pick_from_list (c, queries, queries_num,
                                        &db->queries, &db->queries_num);
+               else if (0 == strcasecmp (c->key, "Writer"))
+                       config_add_writer (c, writers, writers_num,
+                                       &db->writers, &db->writers_num);
                else if (0 == strcasecmp (c->key, "Interval"))
                        cf_util_get_cdtime (c, &db->interval);
+               else if (strcasecmp ("CommitInterval", c->key) == 0)
+                       cf_util_get_cdtime (c, &db->commit_interval);
                else
                        log_warn ("Ignoring unknown config key \"%s\".", c->key);
        }
 
        /* If no `Query' options were given, add the default queries.. */
-       if (db->queries_num == 0) {
+       if ((db->queries_num == 0) && (db->writers_num == 0)){
                for (i = 0; i < def_queries_num; i++)
                        udb_query_pick_from_list_by_name (def_queries[i],
                                        queries, queries_num,
@@ -685,11 +1267,34 @@ static int c_psql_config_database (oconfig_item_t *ci)
 
        ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance);
 
-       CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
+       if (db->queries_num > 0) {
+               CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
 
-       plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
-                       /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
-                       &ud);
+               ++db->ref_cnt;
+               plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
+                               /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
+                               &ud);
+       }
+       if (db->writers_num > 0) {
+               ++db->ref_cnt;
+               plugin_register_write (cb_name, c_psql_write, &ud);
+
+               if (! have_flush) {
+                       /* flush all */
+                       plugin_register_flush ("postgresql",
+                                       c_psql_flush, /* user data = */ NULL);
+                       have_flush = 1;
+               }
+
+               /* flush this connection only */
+               ++db->ref_cnt;
+               plugin_register_flush (cb_name, c_psql_flush, &ud);
+       }
+       else if (db->commit_interval > 0) {
+               log_warn ("Database '%s': You do not have any writers assigned to "
+                               "this database connection. Setting 'CommitInterval' does "
+                               "not have any effect.", db->database);
+       }
        return 0;
 } /* c_psql_config_database */
 
@@ -721,6 +1326,8 @@ static int c_psql_config (oconfig_item_t *ci)
                if (0 == strcasecmp (c->key, "Query"))
                        udb_query_create (&queries, &queries_num, c,
                                        /* callback = */ config_query_callback);
+               else if (0 == strcasecmp (c->key, "Writer"))
+                       c_psql_config_writer (c);
                else if (0 == strcasecmp (c->key, "Database"))
                        c_psql_config_database (c);
                else
index aac6135..6789758 100644 (file)
@@ -61,4 +61,39 @@ cdtime_t cdtime (void) /* {{{ */
 } /* }}} cdtime_t cdtime */
 #endif
 
+size_t cdtime_to_iso8601 (char *s, size_t max, cdtime_t t) /* {{{ */
+{
+  struct timespec t_spec;
+  struct tm t_tm;
+
+  size_t len;
+
+  CDTIME_T_TO_TIMESPEC (t, &t_spec);
+  NORMALIZE_TIMESPEC (t_spec);
+
+  if (localtime_r ((time_t *)&t_spec.tv_sec, &t_tm) == NULL) {
+    char errbuf[1024];
+    ERROR ("cdtime_to_iso8601: localtime_r failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    return (0);
+  }
+
+  len = strftime (s, max, "%Y-%m-%dT%H:%M:%S", &t_tm);
+  if (len == 0)
+    return 0;
+
+  if (max - len > 2) {
+    int n = snprintf (s + len, max - len, ".%09i", (int)t_spec.tv_nsec);
+    len += (n < max - len) ? n : max - len;
+  }
+
+  if (max - len > 3) {
+    int n = strftime (s + len, max - len, "%z", &t_tm);
+    len += (n < max - len) ? n : max - len;
+  }
+
+  s[max - 1] = '\0';
+  return len;
+} /* }}} size_t cdtime_to_iso8601 */
+
 /* vim: set sw=2 sts=2 et fdm=marker : */
index 0fd809a..0081957 100644 (file)
 
 cdtime_t cdtime (void);
 
+/* format a cdtime_t value in ISO 8601 format:
+ * returns the number of characters written to the string (not including the
+ * terminating null byte or 0 on error; the function ensures that the string
+ * is null terminated */
+size_t cdtime_to_iso8601 (char *s, size_t max, cdtime_t t);
+
 #endif /* UTILS_TIME_H */
 /* vim: set sw=2 sts=2 et : */