postgresql plugin: Added support for "flushing" data.
authorSebastian Harl <sh@tokkee.org>
Thu, 15 Nov 2012 12:29:20 +0000 (13:29 +0100)
committerSebastian Harl <sh@tokkee.org>
Thu, 15 Nov 2012 12:29:20 +0000 (13:29 +0100)
This may be used to commit a PostgreSQL writer's transaction. Two different
types of flush callbacks are registered:

 - postgresql: Flush *all* databases.
 - postgresql-<database>: Flush the database named '<database>' only.

src/postgresql.c

index ef7a5ec..9f4894c 100644 (file)
@@ -934,10 +934,41 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
        return 0;
 } /* c_psql_write */
 
+/* We cannot flush single identifiers as all we do is to commit the currently
+ * running transaction, thus making sure that all written data is actually
+ * visible to everybody. */
+static int c_psql_flush (cdtime_t timeout,
+               __attribute__((unused)) const char *ident,
+               user_data_t *ud)
+{
+       c_psql_database_t *dbs = databases;
+       size_t dbs_num = databases_num;
+       size_t i;
+
+       if ((ud != NULL) && (ud->data != NULL)) {
+               dbs = ud->data;
+               dbs_num = 1;
+       }
+
+       for (i = 0; i < dbs_num; ++i) {
+               c_psql_database_t *db = dbs + i;
+
+               /* don't commit if the timeout is larger than the regular commit
+                * interval as in that case all requested data has already been
+                * committed */
+               if ((db->next_commit > 0) && (db->commit_interval > timeout))
+                       if (c_psql_commit (db) == 0)
+                               c_psql_begin (db);
+       }
+       return 0;
+} /* c_psql_flush */
+
 static int c_psql_shutdown (void)
 {
        size_t i = 0;
 
+       _Bool had_flush = 0;
+
        plugin_unregister_read_group ("postgresql");
 
        for (i = 0; i < databases_num; ++i) {
@@ -947,6 +978,13 @@ static int c_psql_shutdown (void)
                        char cb_name[DATA_MAX_NAME_LEN];
                        ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s",
                                        db->database);
+
+                       if (! had_flush) {
+                               plugin_unregister_flush ("postgresql");
+                               had_flush = 1;
+                       }
+
+                       plugin_unregister_flush (cb_name);
                        plugin_unregister_write (cb_name);
                }
        }
@@ -1127,6 +1165,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
        struct timespec cb_interval = { 0, 0 };
        user_data_t ud;
 
+       static _Bool have_flush = 0;
+
        int i;
 
        if ((1 != ci->values_num)
@@ -1223,6 +1263,17 @@ static int c_psql_config_database (oconfig_item_t *ci)
        if (db->writers_num > 0) {
                ++db->ref_cnt;
                plugin_register_write (cb_name, c_psql_write, &ud);
+
+               if (! have_flush) {
+                       /* flush all */
+                       plugin_register_flush ("postgresql",
+                                       c_psql_flush, /* user data = */ NULL);
+                       have_flush = 1;
+               }
+
+               /* flush this connection only */
+               ++db->ref_cnt;
+               plugin_register_flush (cb_name, c_psql_flush, &ud);
        }
        else if (db->commit_interval > 0) {
                log_warn ("Database '%s': You do not have any writers assigned to "