2 * collectd - src/write_mongodb.c
3 * Copyright (C) 2010-2013 Florian Forster
4 * Copyright (C) 2010 Akkarit Sangpetch
5 * Copyright (C) 2012 Chris Lundquist
6 * Copyright (C) 2017 Saikrishna Arcot
8 * Permission is hereby granted, free of charge, to any person obtaining a
9 * copy of this software and associated documentation files (the "Software"),
10 * to deal in the Software without restriction, including without limitation
11 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 * and/or sell copies of the Software, and to permit persons to whom the
13 * Software is furnished to do so, subject to the following conditions:
15 * The above copyright notice and this permission notice shall be included in
16 * all copies or substantial portions of the Software.
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 * DEALINGS IN THE SOFTWARE.
27 * Florian Forster <octo at collectd.org>
28 * Akkarit Sangpetch <asangpet at andrew.cmu.edu>
29 * Chris Lundquist <clundquist at bluebox.net>
30 * Saikrishna Arcot <saiarcot895 at gmail.com>
37 #include "utils_cache.h"
42 char name[DATA_MAX_NAME_LEN];
48 /* Authentication information */
56 mongoc_client_t *client;
57 mongoc_database_t *database;
60 typedef struct wm_node_s wm_node_t;
65 static bson_t *wm_create_bson(const data_set_t *ds, /* {{{ */
66 const value_list_t *vl, bool store_rates) {
73 ERROR("write_mongodb plugin: bson_new failed.");
78 rates = uc_get_rate(ds, vl);
80 ERROR("write_mongodb plugin: uc_get_rate() failed.");
88 BSON_APPEND_DATE_TIME(ret, "timestamp", CDTIME_T_TO_MS(vl->time));
89 BSON_APPEND_UTF8(ret, "host", vl->host);
90 BSON_APPEND_UTF8(ret, "plugin", vl->plugin);
91 BSON_APPEND_UTF8(ret, "plugin_instance", vl->plugin_instance);
92 BSON_APPEND_UTF8(ret, "type", vl->type);
93 BSON_APPEND_UTF8(ret, "type_instance", vl->type_instance);
95 BSON_APPEND_ARRAY_BEGIN(ret, "values", &subarray); /* {{{ */
96 for (size_t i = 0; i < ds->ds_num; i++) {
99 snprintf(key, sizeof(key), "%" PRIsz, i);
101 if (ds->ds[i].type == DS_TYPE_GAUGE)
102 BSON_APPEND_DOUBLE(&subarray, key, vl->values[i].gauge);
103 else if (store_rates)
104 BSON_APPEND_DOUBLE(&subarray, key, (double)rates[i]);
105 else if (ds->ds[i].type == DS_TYPE_COUNTER)
106 BSON_APPEND_INT64(&subarray, key, vl->values[i].counter);
107 else if (ds->ds[i].type == DS_TYPE_DERIVE)
108 BSON_APPEND_INT64(&subarray, key, vl->values[i].derive);
109 else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
110 BSON_APPEND_INT64(&subarray, key, vl->values[i].absolute);
112 ERROR("write_mongodb plugin: Unknown ds_type %d for index %" PRIsz,
118 bson_append_array_end(ret, &subarray); /* }}} values */
120 BSON_APPEND_ARRAY_BEGIN(ret, "dstypes", &subarray); /* {{{ */
121 for (size_t i = 0; i < ds->ds_num; i++) {
124 snprintf(key, sizeof(key), "%" PRIsz, i);
127 BSON_APPEND_UTF8(&subarray, key, "gauge");
129 BSON_APPEND_UTF8(&subarray, key, DS_TYPE_TO_STRING(ds->ds[i].type));
131 bson_append_array_end(ret, &subarray); /* }}} dstypes */
133 BSON_APPEND_ARRAY_BEGIN(ret, "dsnames", &subarray); /* {{{ */
134 for (size_t i = 0; i < ds->ds_num; i++) {
137 snprintf(key, sizeof(key), "%" PRIsz, i);
138 BSON_APPEND_UTF8(&subarray, key, ds->ds[i].name);
140 bson_append_array_end(ret, &subarray); /* }}} dsnames */
144 size_t error_location;
145 if (!bson_validate(ret, BSON_VALIDATE_UTF8, &error_location)) {
146 ERROR("write_mongodb plugin: Error in generated BSON document "
154 } /* }}} bson *wm_create_bson */
156 static int wm_initialize(wm_node_t *node) /* {{{ */
163 INFO("write_mongodb plugin: Connecting to [%s]:%d", node->host, node->port);
165 if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) {
166 uri = ssnprintf_alloc("mongodb://%s:%s@%s:%d/?authSource=%s", node->user,
167 node->passwd, node->host, node->port, node->db);
169 ERROR("write_mongodb plugin: Not enough memory to assemble "
170 "authentication string.");
171 mongoc_client_destroy(node->client);
173 node->connected = false;
177 node->client = mongoc_client_new(uri);
179 ERROR("write_mongodb plugin: Authenticating to [%s]:%d for database "
180 "\"%s\" as user \"%s\" failed.",
181 node->host, node->port, node->db, node->user);
182 node->connected = false;
187 uri = ssnprintf_alloc("mongodb://%s:%d", node->host, node->port);
189 ERROR("write_mongodb plugin: Not enough memory to assemble "
190 "authentication string.");
191 mongoc_client_destroy(node->client);
193 node->connected = false;
197 node->client = mongoc_client_new(uri);
199 ERROR("write_mongodb plugin: Connecting to [%s]:%d failed.", node->host,
201 node->connected = false;
208 node->database = mongoc_client_get_database(node->client, "collectd");
209 if (!node->database) {
210 ERROR("write_mongodb plugin: error creating/getting database");
211 mongoc_client_destroy(node->client);
213 node->connected = false;
217 node->connected = true;
219 } /* }}} int wm_initialize */
221 static int wm_write(const data_set_t *ds, /* {{{ */
222 const value_list_t *vl, user_data_t *ud) {
223 wm_node_t *node = ud->data;
224 mongoc_collection_t *collection = NULL;
229 bson_record = wm_create_bson(ds, vl, node->store_rates);
231 ERROR("write_mongodb plugin: error making insert bson");
235 pthread_mutex_lock(&node->lock);
236 if (wm_initialize(node) < 0) {
237 ERROR("write_mongodb plugin: error making connection to server");
238 pthread_mutex_unlock(&node->lock);
239 bson_destroy(bson_record);
244 mongoc_client_get_collection(node->client, "collectd", vl->plugin);
246 ERROR("write_mongodb plugin: error creating/getting collection");
247 mongoc_database_destroy(node->database);
248 mongoc_client_destroy(node->client);
249 node->database = NULL;
251 node->connected = false;
252 pthread_mutex_unlock(&node->lock);
253 bson_destroy(bson_record);
257 status = mongoc_collection_insert(collection, MONGOC_INSERT_NONE, bson_record,
261 ERROR("write_mongodb plugin: error inserting record: %s", error.message);
262 mongoc_database_destroy(node->database);
263 mongoc_client_destroy(node->client);
264 node->database = NULL;
266 node->connected = false;
267 pthread_mutex_unlock(&node->lock);
268 bson_destroy(bson_record);
269 mongoc_collection_destroy(collection);
273 /* free our resource as not to leak memory */
274 mongoc_collection_destroy(collection);
276 pthread_mutex_unlock(&node->lock);
278 bson_destroy(bson_record);
281 } /* }}} int wm_write */
283 static void wm_config_free(void *ptr) /* {{{ */
285 wm_node_t *node = ptr;
290 mongoc_database_destroy(node->database);
291 mongoc_client_destroy(node->client);
292 node->database = NULL;
294 node->connected = false;
298 } /* }}} void wm_config_free */
300 static int wm_config_node(oconfig_item_t *ci) /* {{{ */
305 node = calloc(1, sizeof(*node));
309 node->host = strdup("localhost");
310 if (node->host == NULL) {
314 node->port = MONGOC_DEFAULT_PORT;
315 node->store_rates = true;
316 pthread_mutex_init(&node->lock, /* attr = */ NULL);
318 status = cf_util_get_string_buffer(ci, node->name, sizeof(node->name));
326 for (int i = 0; i < ci->children_num; i++) {
327 oconfig_item_t *child = ci->children + i;
329 if (strcasecmp("Host", child->key) == 0)
330 status = cf_util_get_string(child, &node->host);
331 else if (strcasecmp("Port", child->key) == 0) {
332 status = cf_util_get_port_number(child);
337 } else if (strcasecmp("Timeout", child->key) == 0)
338 status = cf_util_get_int(child, &node->timeout);
339 else if (strcasecmp("StoreRates", child->key) == 0)
340 status = cf_util_get_boolean(child, &node->store_rates);
341 else if (strcasecmp("Database", child->key) == 0)
342 status = cf_util_get_string(child, &node->db);
343 else if (strcasecmp("User", child->key) == 0)
344 status = cf_util_get_string(child, &node->user);
345 else if (strcasecmp("Password", child->key) == 0)
346 status = cf_util_get_string(child, &node->passwd);
348 WARNING("write_mongodb plugin: Ignoring unknown config option \"%s\".",
353 } /* for (i = 0; i < ci->children_num; i++) */
355 if ((node->db != NULL) || (node->user != NULL) || (node->passwd != NULL)) {
356 if ((node->db == NULL) || (node->user == NULL) || (node->passwd == NULL)) {
358 "write_mongodb plugin: Authentication requires the "
359 "\"Database\", \"User\" and \"Password\" options to be specified, "
360 "but at last one of them is missing. Authentication will NOT be "
369 char cb_name[sizeof("write_mongodb/") + DATA_MAX_NAME_LEN];
371 snprintf(cb_name, sizeof(cb_name), "write_mongodb/%s", node->name);
374 plugin_register_write(cb_name, wm_write,
376 .data = node, .free_func = wm_config_free,
378 INFO("write_mongodb plugin: registered write plugin %s %d", cb_name,
383 wm_config_free(node);
386 } /* }}} int wm_config_node */
388 static int wm_config(oconfig_item_t *ci) /* {{{ */
390 for (int i = 0; i < ci->children_num; i++) {
391 oconfig_item_t *child = ci->children + i;
393 if (strcasecmp("Node", child->key) == 0)
394 wm_config_node(child);
396 WARNING("write_mongodb plugin: Ignoring unknown "
397 "configuration option \"%s\" at top level.",
402 } /* }}} int wm_config */
404 void module_register(void) {
405 plugin_register_complex_config("write_mongodb", wm_config);