X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fpostgresql.c;h=47c9c5c26f5a7e71919e25b44ac8a27165eaca2a;hb=4c88746c9a916eb93f2ea4a2d83edd803b7ec06c;hp=eeb036fe27d329cf47dcd1ed54af49c6c4d1c9df;hpb=aabd019de0a1bd837ec770bc53ef9433cb136c12;p=collectd.git diff --git a/src/postgresql.c b/src/postgresql.c index eeb036fe..47c9c5c2 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -2,34 +2,28 @@ * collectd - src/postgresql.c * Copyright (C) 2008-2012 Sebastian Harl * Copyright (C) 2009 Florian Forster - * All rights reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * - Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * - Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: * Sebastian Harl - * Florian Forster + * Florian Forster **/ /* @@ -101,6 +95,7 @@ typedef enum { C_PSQL_PARAM_DB, C_PSQL_PARAM_USER, C_PSQL_PARAM_INTERVAL, + C_PSQL_PARAM_INSTANCE, } c_psql_param_t; /* Parameter configuration. Stored as `user data' in the query objects. */ @@ -140,6 +135,7 @@ typedef struct { /* writer "caching" settings */ cdtime_t commit_interval; cdtime_t next_commit; + cdtime_t expire_delay; char *host; char *port; @@ -147,6 +143,8 @@ typedef struct { char *user; char *password; + char *instance; + char *sslmode; char *krbsrvname; @@ -167,14 +165,14 @@ static char *def_queries[] = { }; static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); -static c_psql_database_t *databases = NULL; -static size_t databases_num = 0; +static c_psql_database_t **databases = NULL; +static size_t databases_num = 0; -static udb_query_t **queries = NULL; -static size_t queries_num = 0; +static udb_query_t **queries = NULL; +static size_t queries_num = 0; -static c_psql_writer_t *writers = NULL; -static size_t writers_num = 0; +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; static int c_psql_begin (c_psql_database_t *db) { @@ -217,17 +215,25 @@ static int c_psql_commit (c_psql_database_t *db) static c_psql_database_t *c_psql_database_new (const char *name) { - c_psql_database_t *db; + c_psql_database_t **tmp; + c_psql_database_t *db; - db = (c_psql_database_t *)realloc (databases, - (databases_num + 1) * sizeof (*db)); + db = (c_psql_database_t *)malloc (sizeof(*db)); if (NULL == db) { log_err ("Out of memory."); return NULL; } - databases = db; - db = databases + databases_num; + tmp = (c_psql_database_t **)realloc (databases, + (databases_num + 1) * sizeof (*databases)); + if (NULL == tmp) { + log_err ("Out of memory."); + sfree (db); + return NULL; + } + + databases = tmp; + databases[databases_num] = db; ++databases_num; db->conn = NULL; @@ -252,6 +258,7 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->commit_interval = 0; db->next_commit = 0; + db->expire_delay = 0; db->database = sstrdup (name); db->host = NULL; @@ -259,6 +266,8 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->user = NULL; db->password = NULL; + db->instance = sstrdup (name); + db->sslmode = NULL; db->krbsrvname = NULL; @@ -310,6 +319,8 @@ static void c_psql_database_delete (void *data) sfree (db->user); sfree (db->password); + sfree (db->instance); + sfree (db->sslmode); sfree (db->krbsrvname); @@ -317,7 +328,9 @@ static void c_psql_database_delete (void *data) sfree (db->service); /* don't care about freeing or reordering the 'databases' array - * this is done in 'shutdown' */ + * this is done in 'shutdown'; also, don't free the database instance + * object just to make sure that in case anybody accesses it before + * shutdown won't segfault */ return; } /* c_psql_database_delete */ @@ -364,9 +377,6 @@ static int c_psql_check_connection (c_psql_database_t *db) c_psql_connect (db); } - /* "ping" */ - PQclear (PQexec (db->conn, "SELECT 42;")); - if (CONNECTION_OK != PQstatus (db->conn)) { PQreset (db->conn); @@ -376,8 +386,9 @@ static int c_psql_check_connection (c_psql_database_t *db) if (CONNECTION_OK != PQstatus (db->conn)) { c_complain (LOG_ERR, &db->conn_complaint, - "Failed to connect to database %s: %s", - db->database, PQerrorMessage (db->conn)); + "Failed to connect to database %s (%s): %s", + db->database, db->instance, + PQerrorMessage (db->conn)); return -1; } @@ -442,9 +453,13 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, case C_PSQL_PARAM_INTERVAL: ssnprintf (interval, sizeof (interval), "%.3f", (db->interval > 0) - ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g); + ? CDTIME_T_TO_DOUBLE (db->interval) + : plugin_get_interval ()); params[i] = interval; break; + case C_PSQL_PARAM_INSTANCE: + params[i] = db->instance; + break; default: assert (0); } @@ -483,9 +498,10 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, else if ((NULL == data) || (0 == data->params_num)) res = c_psql_exec_query_noparams (db, q); else { - log_err ("Connection to database \"%s\" does not support parameters " - "(protocol version %d) - cannot execute query \"%s\".", - db->database, db->proto_version, + log_err ("Connection to database \"%s\" (%s) does not support " + "parameters (protocol version %d) - " + "cannot execute query \"%s\".", + db->database, db->instance, db->proto_version, udb_query_get_name (q)); return -1; } @@ -501,6 +517,12 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, if (PGRES_TUPLES_OK != PQresultStatus (res)) { pthread_mutex_lock (&db->db_lock); + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + PQclear (res); + return c_psql_exec_query (db, q, prep_area); + } + log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); log_info ("SQL query was: %s", @@ -545,13 +567,14 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, } if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host) + || (0 == strcmp (db->host, "127.0.0.1")) || (0 == strcmp (db->host, "localhost"))) host = hostname_g; else host = db->host; status = udb_query_prepare_result (q, prep_area, host, "postgresql", - db->database, column_names, (size_t) column_num, db->interval); + db->instance, column_names, (size_t) column_num, db->interval); if (0 != status) { log_err ("udb_query_prepare_result failed with status %i.", status); @@ -602,6 +625,7 @@ static int c_psql_read (user_data_t *ud) db = ud->data; assert (NULL != db->database); + assert (NULL != db->instance); assert (NULL != db->queries); pthread_mutex_lock (&db->db_lock); @@ -845,6 +869,12 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, #undef VALUE_OR_NULL + if( db->expire_delay > 0 && vl->time < (cdtime() - vl->interval - db->expire_delay) ) { + log_info ("c_psql_write: Skipped expired value @ %s - %s/%s-%s/%s-%s/%s", + params[0], params[1], params[2], params[3], params[4], params[5], params[6] ); + return 0; + } + pthread_mutex_lock (&db->db_lock); if (0 != c_psql_check_connection (db)) { @@ -886,10 +916,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK != PQresultStatus (res)) && (PGRES_TUPLES_OK != PQresultStatus (res))) { + PQclear (res); + if ((CONNECTION_OK != PQstatus (db->conn)) && (0 == c_psql_check_connection (db))) { - PQclear (res); - /* try again */ res = PQexecParams (db->conn, writer->statement, STATIC_ARRAY_SIZE (params), NULL, @@ -898,6 +928,7 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK == PQresultStatus (res)) || (PGRES_TUPLES_OK == PQresultStatus (res))) { + PQclear (res); success = 1; continue; } @@ -918,6 +949,8 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, pthread_mutex_unlock (&db->db_lock); return -1; } + + PQclear (res); success = 1; } @@ -939,17 +972,17 @@ static int c_psql_flush (cdtime_t timeout, __attribute__((unused)) const char *ident, user_data_t *ud) { - c_psql_database_t *dbs = databases; + c_psql_database_t **dbs = databases; size_t dbs_num = databases_num; size_t i; if ((ud != NULL) && (ud->data != NULL)) { - dbs = ud->data; + dbs = (void *)&ud->data; dbs_num = 1; } for (i = 0; i < dbs_num; ++i) { - c_psql_database_t *db = dbs + i; + c_psql_database_t *db = dbs[i]; /* don't commit if the timeout is larger than the regular commit * interval as in that case all requested data has already been @@ -969,7 +1002,7 @@ static int c_psql_shutdown (void) plugin_unregister_read_group ("postgresql"); for (i = 0; i < databases_num; ++i) { - c_psql_database_t *db = databases + i; + c_psql_database_t *db = databases[i]; if (db->writers_num > 0) { char cb_name[DATA_MAX_NAME_LEN]; @@ -984,6 +1017,8 @@ static int c_psql_shutdown (void) plugin_unregister_flush (cb_name); plugin_unregister_write (cb_name); } + + sfree (db); } udb_query_free (queries, queries_num); @@ -1036,6 +1071,8 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) data->params[data->params_num] = C_PSQL_PARAM_USER; else if (0 == strcasecmp (param_str, "interval")) data->params[data->params_num] = C_PSQL_PARAM_INTERVAL; + else if (0 == strcasecmp (param_str, "instance")) + data->params[data->params_num] = C_PSQL_PARAM_INSTANCE; else { log_err ("Invalid parameter \"%s\".", param_str); return 1; @@ -1189,6 +1226,8 @@ static int c_psql_config_database (oconfig_item_t *ci) cf_util_get_string (c, &db->user); else if (0 == strcasecmp (c->key, "Password")) cf_util_get_string (c, &db->password); + else if (0 == strcasecmp (c->key, "Instance")) + cf_util_get_string (c, &db->instance); else if (0 == strcasecmp (c->key, "SSLMode")) cf_util_get_string (c, &db->sslmode); else if (0 == strcasecmp (c->key, "KRBSrvName")) @@ -1205,6 +1244,8 @@ static int c_psql_config_database (oconfig_item_t *ci) cf_util_get_cdtime (c, &db->interval); else if (strcasecmp ("CommitInterval", c->key) == 0) cf_util_get_cdtime (c, &db->commit_interval); + else if (strcasecmp ("ExpireDelay", c->key) == 0) + cf_util_get_cdtime (c, &db->expire_delay); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } @@ -1247,7 +1288,7 @@ static int c_psql_config_database (oconfig_item_t *ci) ud.data = db; ud.free_func = c_psql_database_delete; - ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database); + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance); if (db->queries_num > 0) { CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);