X-Git-Url: https://git.verplant.org/?a=blobdiff_plain;f=src%2Fpostgresql.c;h=d65a66fd04d0bd8e033c28486c08156588f07c8d;hb=29fbb8ec3fab3fc1e5c2e386b63f65f6832c0e7d;hp=40f8ec57dadaa91c5cfc414cbfdf7bf0a1bc0284;hpb=10075e6fe3c384b73c2dd398a1435f8d10e56654;p=collectd.git diff --git a/src/postgresql.c b/src/postgresql.c index 40f8ec57..d65a66fd 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -38,6 +38,10 @@ #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__) #define log_info(...) INFO ("postgresql: " __VA_ARGS__) +#ifndef C_PSQL_DEFAULT_CONF +# define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf" +#endif + /* Appends the (parameter, value) pair to the string * pointed to by 'buf' suitable to be used as argument * for PQconnectdb(). If value equals NULL, the pair @@ -72,11 +76,39 @@ C_PSQL_IS_UNIX_DOMAIN_SOCKET (host) ? "/.s.PGSQL." : ":", \ port +typedef enum { + C_PSQL_PARAM_HOST = 1, + C_PSQL_PARAM_DB, + C_PSQL_PARAM_USER, +} c_psql_param_t; + +typedef struct { + char *type; + char *type_instance; + int ds_type; +} c_psql_col_t; + +typedef struct { + char *name; + char *query; + + c_psql_param_t *params; + int params_num; + + c_psql_col_t *cols; + int cols_num; +} c_psql_query_t; + typedef struct { PGconn *conn; c_complain_t conn_complaint; + int max_params_num; + /* user configuration */ + c_psql_query_t **queries; + int queries_num; + char *host; char *port; char *database; @@ -90,9 +122,128 @@ typedef struct { char *service; } c_psql_database_t; +static char *def_queries[] = { + "user_tables", + "io_user_tables" +}; +static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); + +static c_psql_query_t *queries = NULL; +static int queries_num = 0; + static c_psql_database_t *databases = NULL; static int databases_num = 0; +static c_psql_query_t *c_psql_query_new (const char *name) +{ + c_psql_query_t *query; + + ++queries_num; + if (NULL == (queries = (c_psql_query_t *)realloc (queries, + queries_num * sizeof (*queries)))) { + log_err ("Out of memory."); + exit (5); + } + query = queries + queries_num - 1; + + query->name = sstrdup (name); + query->query = NULL; + + query->params = NULL; + query->params_num = 0; + + query->cols = NULL; + query->cols_num = 0; + return query; +} /* c_psql_query_new */ + +static void c_psql_query_delete (c_psql_query_t *query) +{ + int i; + + sfree (query->name); + sfree (query->query); + + sfree (query->params); + query->params_num = 0; + + for (i = 0; i < query->cols_num; ++i) { + sfree (query->cols[i].type); + sfree (query->cols[i].type_instance); + } + sfree (query->cols); + query->cols_num = 0; + return; +} /* c_psql_query_delete */ + +static c_psql_query_t *c_psql_query_get (const char *name) +{ + int i; + + for (i = 0; i < queries_num; ++i) + if (0 == strcasecmp (name, queries[i].name)) + return queries + i; + return NULL; +} /* c_psql_query_get */ + +static c_psql_database_t *c_psql_database_new (const char *name) +{ + c_psql_database_t *db; + + ++databases_num; + if (NULL == (databases = (c_psql_database_t *)realloc (databases, + databases_num * sizeof (*databases)))) { + log_err ("Out of memory."); + exit (5); + } + + db = databases + (databases_num - 1); + + db->conn = NULL; + + db->conn_complaint.last = 0; + db->conn_complaint.interval = 0; + + db->max_params_num = 0; + + db->queries = NULL; + db->queries_num = 0; + + db->database = sstrdup (name); + db->host = NULL; + db->port = NULL; + db->user = NULL; + db->password = NULL; + + db->sslmode = NULL; + + db->krbsrvname = NULL; + + db->service = NULL; + return db; +} /* c_psql_database_new */ + +static void c_psql_database_delete (c_psql_database_t *db) +{ + PQfinish (db->conn); + + sfree (db->queries); + db->queries_num = 0; + + sfree (db->database); + sfree (db->host); + sfree (db->port); + sfree (db->user); + sfree (db->password); + + sfree (db->sslmode); + + sfree (db->krbsrvname); + + sfree (db->service); + return; +} /* c_psql_database_delete */ + static void submit (const c_psql_database_t *db, const char *type, const char *type_instance, value_t *values, size_t values_len) @@ -174,111 +325,94 @@ static int c_psql_check_connection (c_psql_database_t *db) return 0; } /* c_psql_check_connection */ -static int c_psql_stat_database (c_psql_database_t *db) +static int c_psql_exec_query (c_psql_database_t *db, int idx) { - const char *const query = - "SELECT numbackends, xact_commit, xact_rollback " - "FROM pg_stat_database " - "WHERE datname = $1;"; + c_psql_query_t *query; + PGresult *res; - PGresult *res; - - int n; + char *params[db->max_params_num]; - res = PQexecParams (db->conn, query, /* number of parameters */ 1, - NULL, (const char *const *)&db->database, NULL, NULL, - /* return text data */ 0); + int rows, cols; + int i; - if (PGRES_TUPLES_OK != PQresultStatus (res)) { - log_err ("Failed to execute SQL query: %s", - PQerrorMessage (db->conn)); - log_info ("SQL query was: %s", query); - PQclear (res); + if (idx >= db->queries_num) return -1; - } - n = PQntuples (res); - if (1 < n) { - log_warn ("pg_stat_database has more than one entry " - "for database %s - ignoring additional results.", - db->database); - } - else if (1 > n) { - log_err ("pg_stat_database has no entry for database %s", - db->database); - PQclear (res); - return -1; + query = db->queries[idx]; + + assert (db->max_params_num >= query->params_num); + + for (i = 0; i < query->params_num; ++i) { + switch (query->params[i]) { + case C_PSQL_PARAM_HOST: + params[i] = (NULL == db->host) ? "localhost" : db->host; + break; + case C_PSQL_PARAM_DB: + params[i] = db->database; + break; + case C_PSQL_PARAM_USER: + params[i] = db->user; + break; + default: + assert (0); + } } - submit_gauge (db, "pg_numbackends", NULL, PQgetvalue (res, 0, 0)); - - submit_counter (db, "pg_xact", "commit", PQgetvalue (res, 0, 1)); - submit_counter (db, "pg_xact", "rollback", PQgetvalue (res, 0, 2)); - - PQclear (res); - return 0; -} /* c_psql_stat_database */ - -static int c_psql_stat_user_tables (c_psql_database_t *db) -{ - const char *const query = - "SELECT sum(seq_scan), sum(seq_tup_read), " - "sum(idx_scan), sum(idx_tup_fetch), " - "sum(n_tup_ins), sum(n_tup_upd), sum(n_tup_del), " - "sum(n_tup_hot_upd), sum(n_live_tup), sum(n_dead_tup) " - "FROM pg_stat_user_tables;"; - - PGresult *res; - - int n; - - res = PQexec (db->conn, query); + res = PQexecParams (db->conn, query->query, query->params_num, NULL, + (const char *const *)((0 == query->params_num) ? NULL : params), + NULL, NULL, /* return text data */ 0); if (PGRES_TUPLES_OK != PQresultStatus (res)) { log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); - log_info ("SQL query was: %s", query); + log_info ("SQL query was: %s", query->query); PQclear (res); return -1; } - n = PQntuples (res); - assert (1 >= n); - - if (1 > n) /* no user tables */ + rows = PQntuples (res); + if (1 > rows) return 0; - submit_counter (db, "pg_scan", "seq", PQgetvalue (res, 0, 0)); - submit_counter (db, "pg_scan", "seq_tup_read", PQgetvalue (res, 0, 1)); - submit_counter (db, "pg_scan", "idx", PQgetvalue (res, 0, 2)); - submit_counter (db, "pg_scan", "idx_tup_fetch", PQgetvalue (res, 0, 3)); + cols = PQnfields (res); + if (query->cols_num != cols) { + log_err ("SQL query returned wrong number of fields " + "(expected: %i, got: %i)", query->cols_num, cols); + log_info ("SQL query was: %s", query->query); + return -1; + } - submit_counter (db, "pg_n_tup_c", "ins", PQgetvalue (res, 0, 4)); - submit_counter (db, "pg_n_tup_c", "upd", PQgetvalue (res, 0, 5)); - submit_counter (db, "pg_n_tup_c", "del", PQgetvalue (res, 0, 6)); - submit_counter (db, "pg_n_tup_c", "hot_upd", PQgetvalue (res, 0, 7)); + for (i = 0; i < rows; ++i) { + int j; - submit_gauge (db, "pg_n_tup_g", "live", PQgetvalue (res, 0, 8)); - submit_gauge (db, "pg_n_tup_g", "dead", PQgetvalue (res, 0, 9)); + for (j = 0; j < cols; ++j) { + c_psql_col_t col = query->cols[j]; - PQclear (res); + char *value = PQgetvalue (res, i, j); + + if (col.ds_type == DS_TYPE_COUNTER) + submit_counter (db, col.type, col.type_instance, value); + else if (col.ds_type == DS_TYPE_GAUGE) + submit_gauge (db, col.type, col.type_instance, value); + } + } return 0; -} /* c_psql_stat_user_tables */ +} /* c_psql_exec_query */ -static int c_psql_statio_user_tables (c_psql_database_t *db) +static int c_psql_stat_database (c_psql_database_t *db) { const char *const query = - "SELECT sum(heap_blks_read), sum(heap_blks_hit), " - "sum(idx_blks_read), sum(idx_blks_hit), " - "sum(toast_blks_read), sum(toast_blks_hit), " - "sum(tidx_blks_read), sum(tidx_blks_hit) " - "FROM pg_statio_user_tables;"; + "SELECT numbackends, xact_commit, xact_rollback " + "FROM pg_stat_database " + "WHERE datname = $1;"; PGresult *res; int n; - res = PQexec (db->conn, query); + res = PQexecParams (db->conn, query, /* number of parameters */ 1, + NULL, (const char *const *)&db->database, NULL, NULL, + /* return text data */ 0); if (PGRES_TUPLES_OK != PQresultStatus (res)) { log_err ("Failed to execute SQL query: %s", @@ -289,26 +423,26 @@ static int c_psql_statio_user_tables (c_psql_database_t *db) } n = PQntuples (res); - assert (1 >= n); - - if (1 > n) /* no user tables */ - return 0; - - submit_counter (db, "pg_blks", "heap_read", PQgetvalue (res, 0, 0)); - submit_counter (db, "pg_blks", "heap_hit", PQgetvalue (res, 0, 1)); - - submit_counter (db, "pg_blks", "idx_read", PQgetvalue (res, 0, 2)); - submit_counter (db, "pg_blks", "idx_hit", PQgetvalue (res, 0, 3)); + if (1 < n) { + log_warn ("pg_stat_database has more than one entry " + "for database %s - ignoring additional results.", + db->database); + } + else if (1 > n) { + log_err ("pg_stat_database has no entry for database %s", + db->database); + PQclear (res); + return -1; + } - submit_counter (db, "pg_blks", "toast_read", PQgetvalue (res, 0, 4)); - submit_counter (db, "pg_blks", "toast_hit", PQgetvalue (res, 0, 5)); + submit_gauge (db, "pg_numbackends", NULL, PQgetvalue (res, 0, 0)); - submit_counter (db, "pg_blks", "tidx_read", PQgetvalue (res, 0, 6)); - submit_counter (db, "pg_blks", "tidx_hit", PQgetvalue (res, 0, 7)); + submit_counter (db, "pg_xact", "commit", PQgetvalue (res, 0, 1)); + submit_counter (db, "pg_xact", "rollback", PQgetvalue (res, 0, 2)); PQclear (res); return 0; -} /* c_psql_statio_user_tables */ +} /* c_psql_stat_database */ static int c_psql_read (void) { @@ -318,14 +452,17 @@ static int c_psql_read (void) for (i = 0; i < databases_num; ++i) { c_psql_database_t *db = databases + i; + int j; + assert (NULL != db->database); if (0 != c_psql_check_connection (db)) continue; c_psql_stat_database (db); - c_psql_stat_user_tables (db); - c_psql_statio_user_tables (db); + + for (j = 0; j < db->queries_num; ++j) + c_psql_exec_query (db, j); ++success; } @@ -347,24 +484,19 @@ static int c_psql_shutdown (void) for (i = 0; i < databases_num; ++i) { c_psql_database_t *db = databases + i; - - PQfinish (db->conn); - - sfree (db->database); - sfree (db->host); - sfree (db->port); - sfree (db->user); - sfree (db->password); - - sfree (db->sslmode); - - sfree (db->krbsrvname); - - sfree (db->service); + c_psql_database_delete (db); } sfree (databases); databases_num = 0; + + for (i = 0; i < queries_num; ++i) { + c_psql_query_t *query = queries + i; + c_psql_query_delete (query); + } + + sfree (queries); + queries_num = 0; return 0; } /* c_psql_shutdown */ @@ -375,6 +507,33 @@ static int c_psql_init (void) if ((NULL == databases) || (0 == databases_num)) return 0; + for (i = 0; i < queries_num; ++i) { + c_psql_query_t *query = queries + i; + int j; + + for (j = 0; j < query->cols_num; ++j) { + c_psql_col_t *col = query->cols + j; + const data_set_t *ds; + + ds = plugin_get_ds (col->type); + if (NULL == ds) { + log_err ("Column: Unknown type \"%s\".", col->type); + c_psql_shutdown (); + return -1; + } + + if (1 != ds->ds_num) { + log_err ("Column: Invalid type \"%s\" - types defining " + "one data source are supported only (got: %i).", + col->type, ds->ds_num); + c_psql_shutdown (); + return -1; + } + + col->ds_type = ds->ds[0].type; + } + } + for (i = 0; i < databases_num; ++i) { c_psql_database_t *db = databases + i; @@ -410,7 +569,7 @@ static int c_psql_init (void) "at server %s%s%s (server version: %d.%d.%d, " "protocol version: %d, pid: %d)", PQdb (db->conn), PQuser (db->conn), - C_PSQL_SOCKET3(server_host, PQport (db->conn)), + C_PSQL_SOCKET3 (server_host, PQport (db->conn)), C_PSQL_SERVER_VERSION3 (server_version), PQprotocolVersion (db->conn), PQbackendPID (db->conn)); } @@ -433,43 +592,149 @@ static int config_set (char *name, char **var, const oconfig_item_t *ci) return 0; } /* config_set */ -static int c_psql_config_database (oconfig_item_t *ci) +static int config_set_param (c_psql_query_t *query, const oconfig_item_t *ci) { - c_psql_database_t *db; + c_psql_param_t param; + char *param_str; + + if ((0 != ci->children_num) || (1 != ci->values_num) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err ("Param expects a single string argument."); + return 1; + } + + param_str = ci->values[0].value.string; + if (0 == strcasecmp (param_str, "hostname")) + param = C_PSQL_PARAM_HOST; + else if (0 == strcasecmp (param_str, "database")) + param = C_PSQL_PARAM_DB; + else if (0 == strcasecmp (param_str, "username")) + param = C_PSQL_PARAM_USER; + else { + log_err ("Invalid parameter \"%s\".", param_str); + return 1; + } + + ++query->params_num; + if (NULL == (query->params = (c_psql_param_t *)realloc (query->params, + query->params_num * sizeof (*query->params)))) { + log_err ("Out of memory."); + exit (5); + } + + query->params[query->params_num - 1] = param; + return 0; +} /* config_set_param */ + +static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci) +{ + c_psql_col_t *col; int i; - if ((1 != ci->values_num) + if ((0 != ci->children_num) + || (1 > ci->values_num) || (2 < ci->values_num)) { + log_err ("Column expects either one or two arguments."); + return 1; + } + + for (i = 0; i < ci->values_num; ++i) { + if (OCONFIG_TYPE_STRING != ci->values[i].type) { + log_err ("Column expects either one or two string arguments."); + return 1; + } + } + + ++query->cols_num; + if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols, + query->cols_num * sizeof (*query->cols)))) { + log_err ("Out of memory."); + exit (5); + } + + col = query->cols + query->cols_num - 1; + + col->ds_type = -1; + + col->type = sstrdup (ci->values[0].value.string); + col->type_instance = (2 == ci->values_num) + ? sstrdup (ci->values[1].value.string) : NULL; + return 0; +} /* config_set_column */ + +static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci) +{ + c_psql_query_t *query; + + if ((0 != ci->children_num) || (1 != ci->values_num) || (OCONFIG_TYPE_STRING != ci->values[0].type)) { - log_err (" expects a single string argument."); + log_err ("Query expects a single string argument."); return 1; } - ++databases_num; - if (NULL == (databases = (c_psql_database_t *)realloc (databases, - databases_num * sizeof (*databases)))) { + query = c_psql_query_get (ci->values[0].value.string); + if (NULL == query) { + log_err ("Query \"%s\" not found - please check your configuration.", + ci->values[0].value.string); + return 1; + } + + ++db->queries_num; + if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries, + db->queries_num * sizeof (*db->queries)))) { log_err ("Out of memory."); exit (5); } - db = databases + (databases_num - 1); + if (query->params_num > db->max_params_num) + db->max_params_num = query->params_num; - db->conn = NULL; + db->queries[db->queries_num - 1] = query; + return 0; +} /* config_set_query */ - db->conn_complaint.last = 0; - db->conn_complaint.interval = 0; +static int c_psql_config_query (oconfig_item_t *ci) +{ + c_psql_query_t *query; - db->database = sstrdup (ci->values[0].value.string); - db->host = NULL; - db->port = NULL; - db->user = NULL; - db->password = NULL; + int i; - db->sslmode = NULL; + if ((1 != ci->values_num) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err (" expects a single string argument."); + return 1; + } - db->krbsrvname = NULL; + query = c_psql_query_new (ci->values[0].value.string); - db->service = NULL; + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *c = ci->children + i; + + if (0 == strcasecmp (c->key, "Query")) + config_set ("Query", &query->query, c); + else if (0 == strcasecmp (c->key, "Param")) + config_set_param (query, c); + else if (0 == strcasecmp (c->key, "Column")) + config_set_column (query, c); + else + log_warn ("Ignoring unknown config key \"%s\".", c->key); + } + return 0; +} /* c_psql_config_query */ + +static int c_psql_config_database (oconfig_item_t *ci) +{ + c_psql_database_t *db; + + int i; + + if ((1 != ci->values_num) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err (" expects a single string argument."); + return 1; + } + + db = c_psql_database_new (ci->values[0].value.string); for (i = 0; i < ci->children_num; ++i) { oconfig_item_t *c = ci->children + i; @@ -488,20 +753,57 @@ static int c_psql_config_database (oconfig_item_t *ci) config_set ("KRBSrvName", &db->krbsrvname, c); else if (0 == strcasecmp (c->key, "Service")) config_set ("Service", &db->service, c); + else if (0 == strcasecmp (c->key, "Query")) + config_set_query (db, c); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } + + if (NULL == db->queries) { + db->queries = (c_psql_query_t **)malloc (def_queries_num + * sizeof (*db->queries)); + + for (i = 0; i < def_queries_num; ++i) { + db->queries[i] = c_psql_query_get (def_queries[i]); + if (NULL == db->queries[i]) + log_err ("Query \"%s\" not found - " + "please check your installation.", + def_queries[i]); + else + ++db->queries_num; + } + } return 0; } static int c_psql_config (oconfig_item_t *ci) { + static int have_def_config = 0; + int i; + if (0 == have_def_config) { + oconfig_item_t *c; + + have_def_config = 1; + + c = oconfig_parse_file (C_PSQL_DEFAULT_CONF); + if (NULL == c) + log_err ("Failed to read default config ("C_PSQL_DEFAULT_CONF")."); + else + c_psql_config (c); + + if (NULL == queries) + log_err ("Default config ("C_PSQL_DEFAULT_CONF") did not define " + "any queries - please check your installation."); + } + for (i = 0; i < ci->children_num; ++i) { oconfig_item_t *c = ci->children + i; - if (0 == strcasecmp (c->key, "Database")) + if (0 == strcasecmp (c->key, "Query")) + c_psql_config_query (c); + else if (0 == strcasecmp (c->key, "Database")) c_psql_config_database (c); else log_warn ("Ignoring unknown config key \"%s\".", c->key);