From: Sebastian Harl Date: Fri, 18 Jul 2008 19:37:23 +0000 (+0200) Subject: postgresql plugin: Added support for custom queries. X-Git-Tag: collectd-4.5.0~89 X-Git-Url: https://git.verplant.org/?a=commitdiff_plain;h=12d48e6b453a4f028d4550d8cea159c913534c5d;p=collectd.git postgresql plugin: Added support for custom queries. The user may now define and use custom queries to collect data. A query is defined by specifying the SQL query to execute and a definition of the data type of each result column: Query "SELECT magic FROM wizard;" Column gauge magic The "Column" configuration option specifies the type name and optional type instance: Column []. The number and order of the "Column" option has to match the columns of the query result. A query is activated by adding the configuration option "Query " to the appropriate "" configuration blocks. A query may be used multiple times. Signed-off-by: Sebastian Harl Signed-off-by: Florian Forster --- diff --git a/src/postgresql.c b/src/postgresql.c index e4773488..6a270d84 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -73,10 +73,27 @@ port typedef struct { + char *type; + char *type_instance; + int ds_type; +} c_psql_col_t; + +typedef struct { + char *name; + char *query; + + c_psql_col_t *cols; + int cols_num; +} c_psql_query_t; + +typedef struct { PGconn *conn; c_complain_t conn_complaint; /* user configuration */ + c_psql_query_t **queries; + int queries_num; + char *host; char *port; char *database; @@ -90,9 +107,58 @@ typedef struct { char *service; } c_psql_database_t; +static c_psql_query_t *queries = NULL; +static int queries_num = 0; + static c_psql_database_t *databases = NULL; static int databases_num = 0; +static c_psql_query_t *c_psql_query_new (const char *name) +{ + c_psql_query_t *query; + + ++queries_num; + if (NULL == (queries = (c_psql_query_t *)realloc (queries, + queries_num * sizeof (*queries)))) { + log_err ("Out of memory."); + exit (5); + } + query = queries + queries_num - 1; + + query->name = sstrdup (name); + query->query = NULL; + + query->cols = NULL; + query->cols_num = 0; + return query; +} /* c_psql_query_new */ + +static void c_psql_query_delete (c_psql_query_t *query) +{ + int i; + + sfree (query->name); + sfree (query->query); + + for (i = 0; i < query->cols_num; ++i) { + sfree (query->cols[i].type); + sfree (query->cols[i].type_instance); + } + sfree (query->cols); + query->cols_num = 0; + return; +} /* c_psql_query_delete */ + +static c_psql_query_t *c_psql_query_get (const char *name) +{ + int i; + + for (i = 0; i < queries_num; ++i) + if (0 == strcasecmp (name, queries[i].name)) + return queries + i; + return NULL; +} /* c_psql_query_get */ + static c_psql_database_t *c_psql_database_new (const char *name) { c_psql_database_t *db; @@ -111,6 +177,9 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->conn_complaint.last = 0; db->conn_complaint.interval = 0; + db->queries = NULL; + db->queries_num = 0; + db->database = sstrdup (name); db->host = NULL; db->port = NULL; @@ -129,6 +198,9 @@ static void c_psql_database_delete (c_psql_database_t *db) { PQfinish (db->conn); + sfree (db->queries); + db->queries_num = 0; + sfree (db->database); sfree (db->host); sfree (db->port); @@ -224,6 +296,58 @@ static int c_psql_check_connection (c_psql_database_t *db) return 0; } /* c_psql_check_connection */ +static int c_psql_exec_query (c_psql_database_t *db, int idx) +{ + c_psql_query_t *query; + PGresult *res; + + int rows, cols; + int i; + + if (idx >= db->queries_num) + return -1; + + query = db->queries[idx]; + + res = PQexec (db->conn, query->query); + + if (PGRES_TUPLES_OK != PQresultStatus (res)) { + log_err ("Failed to execute SQL query: %s", + PQerrorMessage (db->conn)); + log_info ("SQL query was: %s", query->query); + PQclear (res); + return -1; + } + + rows = PQntuples (res); + if (1 > rows) + return 0; + + cols = PQnfields (res); + if (query->cols_num != cols) { + log_err ("SQL query returned wrong number of fields " + "(expected: %i, got: %i)", query->cols_num, cols); + log_info ("SQL query was: %s", query->query); + return -1; + } + + for (i = 0; i < rows; ++i) { + int j; + + for (j = 0; j < cols; ++j) { + c_psql_col_t col = query->cols[j]; + + char *value = PQgetvalue (res, i, j); + + if (col.ds_type == DS_TYPE_COUNTER) + submit_counter (db, col.type, col.type_instance, value); + else if (col.ds_type == DS_TYPE_GAUGE) + submit_gauge (db, col.type, col.type_instance, value); + } + } + return 0; +} /* c_psql_exec_query */ + static int c_psql_stat_database (c_psql_database_t *db) { const char *const query = @@ -368,6 +492,8 @@ static int c_psql_read (void) for (i = 0; i < databases_num; ++i) { c_psql_database_t *db = databases + i; + int j; + assert (NULL != db->database); if (0 != c_psql_check_connection (db)) @@ -377,6 +503,9 @@ static int c_psql_read (void) c_psql_stat_user_tables (db); c_psql_statio_user_tables (db); + for (j = 0; j < db->queries_num; ++j) + c_psql_exec_query (db, j); + ++success; } @@ -402,6 +531,14 @@ static int c_psql_shutdown (void) sfree (databases); databases_num = 0; + + for (i = 0; i < queries_num; ++i) { + c_psql_query_t *query = queries + i; + c_psql_query_delete (query); + } + + sfree (queries); + queries_num = 0; return 0; } /* c_psql_shutdown */ @@ -412,6 +549,33 @@ static int c_psql_init (void) if ((NULL == databases) || (0 == databases_num)) return 0; + for (i = 0; i < queries_num; ++i) { + c_psql_query_t *query = queries + i; + int j; + + for (j = 0; j < query->cols_num; ++j) { + c_psql_col_t *col = query->cols + j; + const data_set_t *ds; + + ds = plugin_get_ds (col->type); + if (NULL == ds) { + log_err ("Column: Unknown type \"%s\".", col->type); + c_psql_shutdown (); + return -1; + } + + if (1 != ds->ds_num) { + log_err ("Column: Invalid type \"%s\" - types defining " + "one data source are supported only (got: %i).", + col->type, ds->ds_num); + c_psql_shutdown (); + return -1; + } + + col->ds_type = ds->ds[0].type; + } + } + for (i = 0; i < databases_num; ++i) { c_psql_database_t *db = databases + i; @@ -447,7 +611,7 @@ static int c_psql_init (void) "at server %s%s%s (server version: %d.%d.%d, " "protocol version: %d, pid: %d)", PQdb (db->conn), PQuser (db->conn), - C_PSQL_SOCKET3(server_host, PQport (db->conn)), + C_PSQL_SOCKET3 (server_host, PQport (db->conn)), C_PSQL_SERVER_VERSION3 (server_version), PQprotocolVersion (db->conn), PQbackendPID (db->conn)); } @@ -470,6 +634,97 @@ static int config_set (char *name, char **var, const oconfig_item_t *ci) return 0; } /* config_set */ +static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci) +{ + c_psql_col_t *col; + + int i; + + if ((0 != ci->children_num) + || (1 > ci->values_num) || (2 < ci->values_num)) { + log_err ("Column expects either one or two arguments."); + return 1; + } + + for (i = 0; i < ci->values_num; ++i) { + if (OCONFIG_TYPE_STRING != ci->values[i].type) { + log_err ("Column expects either one or two string arguments."); + return 1; + } + } + + ++query->cols_num; + if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols, + query->cols_num * sizeof (*query->cols)))) { + log_err ("Out of memory."); + exit (5); + } + + col = query->cols + query->cols_num - 1; + + col->ds_type = -1; + + col->type = sstrdup (ci->values[0].value.string); + col->type_instance = (2 == ci->values_num) + ? sstrdup (ci->values[1].value.string) : NULL; + return 0; +} /* config_set_column */ + +static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci) +{ + c_psql_query_t *query; + + if ((0 != ci->children_num) || (1 != ci->values_num) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err ("Query expects a single string argument."); + return 1; + } + + query = c_psql_query_get (ci->values[0].value.string); + if (NULL == query) { + log_err ("Query \"%s\" not found - please check your configuration.", + ci->values[0].value.string); + return 1; + } + + ++db->queries_num; + if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries, + db->queries_num * sizeof (*db->queries)))) { + log_err ("Out of memory."); + exit (5); + } + + db->queries[db->queries_num - 1] = query; + return 0; +} /* config_set_query */ + +static int c_psql_config_query (oconfig_item_t *ci) +{ + c_psql_query_t *query; + + int i; + + if ((1 != ci->values_num) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err (" expects a single string argument."); + return 1; + } + + query = c_psql_query_new (ci->values[0].value.string); + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *c = ci->children + i; + + if (0 == strcasecmp (c->key, "Query")) + config_set ("Query", &query->query, c); + else if (0 == strcasecmp (c->key, "Column")) + config_set_column (query, c); + else + log_warn ("Ignoring unknown config key \"%s\".", c->key); + } + return 0; +} /* c_psql_config_query */ + static int c_psql_config_database (oconfig_item_t *ci) { c_psql_database_t *db; @@ -501,6 +756,8 @@ static int c_psql_config_database (oconfig_item_t *ci) config_set ("KRBSrvName", &db->krbsrvname, c); else if (0 == strcasecmp (c->key, "Service")) config_set ("Service", &db->service, c); + else if (0 == strcasecmp (c->key, "Query")) + config_set_query (db, c); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } @@ -514,7 +771,9 @@ static int c_psql_config (oconfig_item_t *ci) for (i = 0; i < ci->children_num; ++i) { oconfig_item_t *c = ci->children + i; - if (0 == strcasecmp (c->key, "Database")) + if (0 == strcasecmp (c->key, "Query")) + c_psql_config_query (c); + else if (0 == strcasecmp (c->key, "Database")) c_psql_config_database (c); else log_warn ("Ignoring unknown config key \"%s\".", c->key);