amqp plugin: Let the user chose the delivery method.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 20:44:15 +0000 (22:44 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 20:44:15 +0000 (22:44 +0200)
Use "volatile" by default, but enable the user to switch to "persistent" if
it is important that no value is lost.

src/amqp.c

index ecbf338..5f44dfc 100644 (file)
 #include <amqp.h>
 #include <amqp_framing.h>
 
+/* Defines for the delivery mode. I have no idea why they're not defined by the
+ * library.. */
+#define AMQP_DM_VOLATILE   1
+#define AMQP_DM_PERSISTENT 2
+
 static int  port;
 static char *host       = NULL;
 static char *vhost      = NULL;
@@ -47,6 +52,7 @@ static char *user       = NULL;
 static char *password   = NULL;
 static char *exchange   = NULL;
 static char *routingkey = NULL;
+static uint8_t delivery_mode = AMQP_DM_VOLATILE;
 
 static amqp_connection_state_t amqp_conn = NULL;
 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -59,7 +65,8 @@ static const char *config_keys[] =
     "User",
     "Password",
     "Exchange",
-    "RoutingKey"
+    "RoutingKey",
+    "Persistent"
 };
 
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
@@ -101,6 +108,14 @@ static int config(const char *key, const char *value)
         return (config_set(&exchange, value));
     else if (strcasecmp(key, "routingkey") == 0)
         return (config_set(&routingkey, value));
+    else if (strcasecmp ("Persistent", key) == 0)
+    {
+        if (IS_TRUE (value))
+            delivery_mode = AMQP_DM_PERSISTENT;
+        else
+            delivery_mode = AMQP_DM_VOLATILE;
+        return (0);
+    }
     return (-1);
 }
 
@@ -169,7 +184,7 @@ static int amqp_write_locked (const char *buffer)
 
     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
     props.content_type = amqp_cstring_bytes("application/json");
-    props.delivery_mode = 2; /* persistent delivery mode */
+    props.delivery_mode = delivery_mode;
 
     status = amqp_basic_publish(amqp_conn,
                 /* channel = */ 1,