postgresql plugin: Added support for custom queries.
authorSebastian Harl <sh@tokkee.org>
Fri, 18 Jul 2008 19:37:23 +0000 (21:37 +0200)
committerFlorian Forster <octo@huhu.verplant.org>
Thu, 24 Jul 2008 12:18:04 +0000 (14:18 +0200)
The user may now define and use custom queries to collect data. A query is
defined by specifying the SQL query to execute and a definition of the data
type of each result column:

  <Query magic>
    Query "SELECT magic FROM wizard;"
    Column gauge magic
  </Query>

The "Column" configuration option specifies the type name and optional type
instance: Column <type> [<type_instance>]. The number and order of the
"Column" option has to match the columns of the query result.

A query is activated by adding the configuration option "Query <name>" to the
appropriate "<Database>" configuration blocks. A query may be used multiple
times.

Signed-off-by: Sebastian Harl <sh@tokkee.org>
Signed-off-by: Florian Forster <octo@huhu.verplant.org>
src/postgresql.c

index e477348..6a270d8 100644 (file)
        port
 
 typedef struct {
+       char *type;
+       char *type_instance;
+       int   ds_type;
+} c_psql_col_t;
+
+typedef struct {
+       char *name;
+       char *query;
+
+       c_psql_col_t *cols;
+       int           cols_num;
+} c_psql_query_t;
+
+typedef struct {
        PGconn      *conn;
        c_complain_t conn_complaint;
 
        /* user configuration */
+       c_psql_query_t **queries;
+       int              queries_num;
+
        char *host;
        char *port;
        char *database;
@@ -90,9 +107,58 @@ typedef struct {
        char *service;
 } c_psql_database_t;
 
+static c_psql_query_t *queries          = NULL;
+static int             queries_num      = 0;
+
 static c_psql_database_t *databases     = NULL;
 static int                databases_num = 0;
 
+static c_psql_query_t *c_psql_query_new (const char *name)
+{
+       c_psql_query_t *query;
+
+       ++queries_num;
+       if (NULL == (queries = (c_psql_query_t *)realloc (queries,
+                               queries_num * sizeof (*queries)))) {
+               log_err ("Out of memory.");
+               exit (5);
+       }
+       query = queries + queries_num - 1;
+
+       query->name  = sstrdup (name);
+       query->query = NULL;
+
+       query->cols     = NULL;
+       query->cols_num = 0;
+       return query;
+} /* c_psql_query_new */
+
+static void c_psql_query_delete (c_psql_query_t *query)
+{
+       int i;
+
+       sfree (query->name);
+       sfree (query->query);
+
+       for (i = 0; i < query->cols_num; ++i) {
+               sfree (query->cols[i].type);
+               sfree (query->cols[i].type_instance);
+       }
+       sfree (query->cols);
+       query->cols_num = 0;
+       return;
+} /* c_psql_query_delete */
+
+static c_psql_query_t *c_psql_query_get (const char *name)
+{
+       int i;
+
+       for (i = 0; i < queries_num; ++i)
+               if (0 == strcasecmp (name, queries[i].name))
+                       return queries + i;
+       return NULL;
+} /* c_psql_query_get */
+
 static c_psql_database_t *c_psql_database_new (const char *name)
 {
        c_psql_database_t *db;
@@ -111,6 +177,9 @@ static c_psql_database_t *c_psql_database_new (const char *name)
        db->conn_complaint.last     = 0;
        db->conn_complaint.interval = 0;
 
+       db->queries     = NULL;
+       db->queries_num = 0;
+
        db->database   = sstrdup (name);
        db->host       = NULL;
        db->port       = NULL;
@@ -129,6 +198,9 @@ static void c_psql_database_delete (c_psql_database_t *db)
 {
        PQfinish (db->conn);
 
+       sfree (db->queries);
+       db->queries_num = 0;
+
        sfree (db->database);
        sfree (db->host);
        sfree (db->port);
@@ -224,6 +296,58 @@ static int c_psql_check_connection (c_psql_database_t *db)
        return 0;
 } /* c_psql_check_connection */
 
+static int c_psql_exec_query (c_psql_database_t *db, int idx)
+{
+       c_psql_query_t *query;
+       PGresult       *res;
+
+       int rows, cols;
+       int i;
+
+       if (idx >= db->queries_num)
+               return -1;
+
+       query = db->queries[idx];
+
+       res = PQexec (db->conn, query->query);
+
+       if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+               log_err ("Failed to execute SQL query: %s",
+                               PQerrorMessage (db->conn));
+               log_info ("SQL query was: %s", query->query);
+               PQclear (res);
+               return -1;
+       }
+
+       rows = PQntuples (res);
+       if (1 > rows)
+               return 0;
+
+       cols = PQnfields (res);
+       if (query->cols_num != cols) {
+               log_err ("SQL query returned wrong number of fields "
+                               "(expected: %i, got: %i)", query->cols_num, cols);
+               log_info ("SQL query was: %s", query->query);
+               return -1;
+       }
+
+       for (i = 0; i < rows; ++i) {
+               int j;
+
+               for (j = 0; j < cols; ++j) {
+                       c_psql_col_t col = query->cols[j];
+
+                       char *value = PQgetvalue (res, i, j);
+
+                       if (col.ds_type == DS_TYPE_COUNTER)
+                               submit_counter (db, col.type, col.type_instance, value);
+                       else if (col.ds_type == DS_TYPE_GAUGE)
+                               submit_gauge (db, col.type, col.type_instance, value);
+               }
+       }
+       return 0;
+} /* c_psql_exec_query */
+
 static int c_psql_stat_database (c_psql_database_t *db)
 {
        const char *const query =
@@ -368,6 +492,8 @@ static int c_psql_read (void)
        for (i = 0; i < databases_num; ++i) {
                c_psql_database_t *db = databases + i;
 
+               int j;
+
                assert (NULL != db->database);
 
                if (0 != c_psql_check_connection (db))
@@ -377,6 +503,9 @@ static int c_psql_read (void)
                c_psql_stat_user_tables (db);
                c_psql_statio_user_tables (db);
 
+               for (j = 0; j < db->queries_num; ++j)
+                       c_psql_exec_query (db, j);
+
                ++success;
        }
 
@@ -402,6 +531,14 @@ static int c_psql_shutdown (void)
 
        sfree (databases);
        databases_num = 0;
+
+       for (i = 0; i < queries_num; ++i) {
+               c_psql_query_t *query = queries + i;
+               c_psql_query_delete (query);
+       }
+
+       sfree (queries);
+       queries_num = 0;
        return 0;
 } /* c_psql_shutdown */
 
@@ -412,6 +549,33 @@ static int c_psql_init (void)
        if ((NULL == databases) || (0 == databases_num))
                return 0;
 
+       for (i = 0; i < queries_num; ++i) {
+               c_psql_query_t *query = queries + i;
+               int j;
+
+               for (j = 0; j < query->cols_num; ++j) {
+                       c_psql_col_t     *col = query->cols + j;
+                       const data_set_t *ds;
+
+                       ds = plugin_get_ds (col->type);
+                       if (NULL == ds) {
+                               log_err ("Column: Unknown type \"%s\".", col->type);
+                               c_psql_shutdown ();
+                               return -1;
+                       }
+
+                       if (1 != ds->ds_num) {
+                               log_err ("Column: Invalid type \"%s\" - types defining "
+                                               "one data source are supported only (got: %i).",
+                                               col->type, ds->ds_num);
+                               c_psql_shutdown ();
+                               return -1;
+                       }
+
+                       col->ds_type = ds->ds[0].type;
+               }
+       }
+
        for (i = 0; i < databases_num; ++i) {
                c_psql_database_t *db = databases + i;
 
@@ -447,7 +611,7 @@ static int c_psql_init (void)
                                "at server %s%s%s (server version: %d.%d.%d, "
                                "protocol version: %d, pid: %d)",
                                PQdb (db->conn), PQuser (db->conn),
-                               C_PSQL_SOCKET3(server_host, PQport (db->conn)),
+                               C_PSQL_SOCKET3 (server_host, PQport (db->conn)),
                                C_PSQL_SERVER_VERSION3 (server_version),
                                PQprotocolVersion (db->conn), PQbackendPID (db->conn));
        }
@@ -470,6 +634,97 @@ static int config_set (char *name, char **var, const oconfig_item_t *ci)
        return 0;
 } /* config_set */
 
+static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci)
+{
+       c_psql_col_t *col;
+
+       int i;
+
+       if ((0 != ci->children_num)
+                       || (1 > ci->values_num) || (2 < ci->values_num)) {
+               log_err ("Column expects either one or two arguments.");
+               return 1;
+       }
+
+       for (i = 0; i < ci->values_num; ++i) {
+               if (OCONFIG_TYPE_STRING != ci->values[i].type) {
+                       log_err ("Column expects either one or two string arguments.");
+                       return 1;
+               }
+       }
+
+       ++query->cols_num;
+       if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols,
+                               query->cols_num * sizeof (*query->cols)))) {
+               log_err ("Out of memory.");
+               exit (5);
+       }
+
+       col = query->cols + query->cols_num - 1;
+
+       col->ds_type = -1;
+
+       col->type = sstrdup (ci->values[0].value.string);
+       col->type_instance = (2 == ci->values_num)
+               ? sstrdup (ci->values[1].value.string) : NULL;
+       return 0;
+} /* config_set_column */
+
+static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci)
+{
+       c_psql_query_t *query;
+
+       if ((0 != ci->children_num) || (1 != ci->values_num)
+                       || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+               log_err ("Query expects a single string argument.");
+               return 1;
+       }
+
+       query = c_psql_query_get (ci->values[0].value.string);
+       if (NULL == query) {
+               log_err ("Query \"%s\" not found - please check your configuration.",
+                               ci->values[0].value.string);
+               return 1;
+       }
+
+       ++db->queries_num;
+       if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries,
+                               db->queries_num * sizeof (*db->queries)))) {
+               log_err ("Out of memory.");
+               exit (5);
+       }
+
+       db->queries[db->queries_num - 1] = query;
+       return 0;
+} /* config_set_query */
+
+static int c_psql_config_query (oconfig_item_t *ci)
+{
+       c_psql_query_t *query;
+
+       int i;
+
+       if ((1 != ci->values_num)
+                       || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+               log_err ("<Query> expects a single string argument.");
+               return 1;
+       }
+
+       query = c_psql_query_new (ci->values[0].value.string);
+
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *c = ci->children + i;
+
+               if (0 == strcasecmp (c->key, "Query"))
+                       config_set ("Query", &query->query, c);
+               else if (0 == strcasecmp (c->key, "Column"))
+                       config_set_column (query, c);
+               else
+                       log_warn ("Ignoring unknown config key \"%s\".", c->key);
+       }
+       return 0;
+} /* c_psql_config_query */
+
 static int c_psql_config_database (oconfig_item_t *ci)
 {
        c_psql_database_t *db;
@@ -501,6 +756,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
                        config_set ("KRBSrvName", &db->krbsrvname, c);
                else if (0 == strcasecmp (c->key, "Service"))
                        config_set ("Service", &db->service, c);
+               else if (0 == strcasecmp (c->key, "Query"))
+                       config_set_query (db, c);
                else
                        log_warn ("Ignoring unknown config key \"%s\".", c->key);
        }
@@ -514,7 +771,9 @@ static int c_psql_config (oconfig_item_t *ci)
        for (i = 0; i < ci->children_num; ++i) {
                oconfig_item_t *c = ci->children + i;
 
-               if (0 == strcasecmp (c->key, "Database"))
+               if (0 == strcasecmp (c->key, "Query"))
+                       c_psql_config_query (c);
+               else if (0 == strcasecmp (c->key, "Database"))
                        c_psql_config_database (c);
                else
                        log_warn ("Ignoring unknown config key \"%s\".", c->key);