#define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
#define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
#define log_info(...) INFO ("postgresql: " __VA_ARGS__)
+#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__)
#ifndef C_PSQL_DEFAULT_CONF
# define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
cdtime_t interval;
+ /* writer "caching" settings */
+ cdtime_t commit_interval;
+ cdtime_t next_commit;
+
char *host;
char *port;
char *database;
db->interval = 0;
+ db->commit_interval = 0;
+ db->next_commit = 0;
+
db->database = sstrdup (name);
db->host = NULL;
db->port = NULL;
sfree (db->writers);
db->writers_num = 0;
+ /* wait for the lock to be released by the last writer */
+ pthread_mutex_lock (&db->db_lock);
+ pthread_mutex_unlock (&db->db_lock);
+
pthread_mutex_destroy (&db->db_lock);
sfree (db->database);
return string;
} /* values_to_sqlarray */
+static int c_psql_begin (c_psql_database_t *db)
+{
+ PGresult *r = PQexec (db->conn, "BEGIN");
+
+ int status = 1;
+
+ if (r != NULL) {
+ if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+ db->next_commit = cdtime() + db->commit_interval;
+ status = 0;
+ }
+ else
+ log_warn ("Failed to initiate ('BEGIN') transaction: %s",
+ PQerrorMessage (db->conn));
+ PQclear (r);
+ }
+ return status;
+} /* c_psql_begin */
+
+static int c_psql_commit (c_psql_database_t *db)
+{
+ PGresult *r = PQexec (db->conn, "COMMIT");
+
+ int status = 1;
+
+ if (r != NULL) {
+ if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+ db->next_commit = cdtime () + db->commit_interval;
+ log_debug ("Successfully committed transaction.");
+ status = 0;
+ }
+ else
+ log_warn ("Failed to commit transaction: %s",
+ PQerrorMessage (db->conn));
+ PQclear (r);
+ }
+ return status;
+} /* c_psql_commit */
+
static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
user_data_t *ud)
{
return -1;
}
+ if ((db->commit_interval > 0)
+ && (db->next_commit == 0))
+ c_psql_begin (db);
+
for (i = 0; i < db->writers_num; ++i) {
c_psql_writer_t *writer;
PGresult *res;
if (values_type_to_sqlarray (ds,
values_type_str, sizeof (values_type_str),
- writer->store_rates) == NULL)
+ writer->store_rates) == NULL) {
+ pthread_mutex_unlock (&db->db_lock);
return -1;
+ }
if (values_to_sqlarray (ds, vl,
values_str, sizeof (values_str),
- writer->store_rates) == NULL)
+ writer->store_rates) == NULL) {
+ pthread_mutex_unlock (&db->db_lock);
return -1;
+ }
params[7] = values_type_str;
params[8] = values_str;
success = 1;
}
+ if ((db->next_commit > 0)
+ && (cdtime () > db->next_commit))
+ if (c_psql_commit (db) == 0)
+ c_psql_begin (db);
+
pthread_mutex_unlock (&db->db_lock);
if (! success)
&db->writers, &db->writers_num);
else if (0 == strcasecmp (c->key, "Interval"))
cf_util_get_cdtime (c, &db->interval);
+ else if (strcasecmp ("CommitInterval", c->key) == 0)
+ cf_util_get_cdtime (c, &db->commit_interval);
else
log_warn ("Ignoring unknown config key \"%s\".", c->key);
}
if (db->writers_num > 0) {
plugin_register_write (cb_name, c_psql_write, &ud);
}
+ else if (db->commit_interval > 0) {
+ log_warn ("Database '%s': You do not have any writers assigned to "
+ "this database connection. Setting 'CommitInterval' does "
+ "not have any effect.", db->database);
+ }
return 0;
} /* c_psql_config_database */