2 * collectd - src/write_mongodb.c
3 * Copyright (C) 2010-2013 Florian Forster
4 * Copyright (C) 2010 Akkarit Sangpetch
5 * Copyright (C) 2012 Chris Lundquist
6 * Copyright (C) 2017 Saikrishna Arcot
8 * Permission is hereby granted, free of charge, to any person obtaining a
9 * copy of this software and associated documentation files (the "Software"),
10 * to deal in the Software without restriction, including without limitation
11 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 * and/or sell copies of the Software, and to permit persons to whom the
13 * Software is furnished to do so, subject to the following conditions:
15 * The above copyright notice and this permission notice shall be included in
16 * all copies or substantial portions of the Software.
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 * DEALINGS IN THE SOFTWARE.
27 * Florian Forster <octo at collectd.org>
28 * Akkarit Sangpetch <asangpet at andrew.cmu.edu>
29 * Chris Lundquist <clundquist at bluebox.net>
30 * Saikrishna Arcot <saiarcot895 at gmail.com>
37 #include "utils_cache.h"
42 char name[DATA_MAX_NAME_LEN];
48 /* Authentication information */
56 mongoc_client_t *client;
57 mongoc_database_t *database;
60 typedef struct wm_node_s wm_node_t;
65 static bson_t *wm_create_bson(const data_set_t *ds, /* {{{ */
66 const value_list_t *vl, _Bool store_rates) {
73 ERROR("write_mongodb plugin: bson_new failed.");
78 rates = uc_get_rate(ds, vl);
80 ERROR("write_mongodb plugin: uc_get_rate() failed.");
88 BSON_APPEND_DATE_TIME(ret, "timestamp", CDTIME_T_TO_MS(vl->time));
89 BSON_APPEND_UTF8(ret, "host", vl->host);
90 BSON_APPEND_UTF8(ret, "plugin", vl->plugin);
91 BSON_APPEND_UTF8(ret, "plugin_instance", vl->plugin_instance);
92 BSON_APPEND_UTF8(ret, "type", vl->type);
93 BSON_APPEND_UTF8(ret, "type_instance", vl->type_instance);
95 BSON_APPEND_ARRAY_BEGIN(ret, "values", &subarray); /* {{{ */
96 for (int i = 0; i < ds->ds_num; i++) {
99 ssnprintf(key, sizeof(key), "%i", i);
101 if (ds->ds[i].type == DS_TYPE_GAUGE)
102 BSON_APPEND_DOUBLE(&subarray, key, vl->values[i].gauge);
103 else if (store_rates)
104 BSON_APPEND_DOUBLE(&subarray, key, (double)rates[i]);
105 else if (ds->ds[i].type == DS_TYPE_COUNTER)
106 BSON_APPEND_INT64(&subarray, key, vl->values[i].counter);
107 else if (ds->ds[i].type == DS_TYPE_DERIVE)
108 BSON_APPEND_INT64(&subarray, key, vl->values[i].derive);
109 else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
110 BSON_APPEND_INT64(&subarray, key, vl->values[i].absolute);
112 ERROR("write_mongodb plugin: Unknown ds_type %d for index %d",
118 bson_append_array_end(ret, &subarray); /* }}} values */
120 BSON_APPEND_ARRAY_BEGIN(ret, "dstypes", &subarray); /* {{{ */
121 for (int i = 0; i < ds->ds_num; i++) {
124 ssnprintf(key, sizeof(key), "%i", i);
127 BSON_APPEND_UTF8(&subarray, key, "gauge");
129 BSON_APPEND_UTF8(&subarray, key, DS_TYPE_TO_STRING(ds->ds[i].type));
131 bson_append_array_end(ret, &subarray); /* }}} dstypes */
133 BSON_APPEND_ARRAY_BEGIN(ret, "dsnames", &subarray); /* {{{ */
134 for (int i = 0; i < ds->ds_num; i++) {
137 ssnprintf(key, sizeof(key), "%i", i);
138 BSON_APPEND_UTF8(&subarray, key, ds->ds[i].name);
140 bson_append_array_end(ret, &subarray); /* }}} dsnames */
144 size_t error_location;
145 if (!bson_validate(ret, BSON_VALIDATE_UTF8, &error_location)) {
146 ERROR("write_mongodb plugin: Error in generated BSON document "
154 } /* }}} bson *wm_create_bson */
156 static int wm_initialize(wm_node_t *node) /* {{{ */
160 char const *format_string;
162 if (node->connected) {
166 INFO("write_mongodb plugin: Connecting to [%s]:%i",
167 (node->host != NULL) ? node->host : "localhost",
168 (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT);
170 if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) {
171 format_string = "mongodb://%s:%s@%s:%d/?authSource=%s";
172 uri_length = strlen(format_string) + strlen(node->user) +
173 strlen(node->passwd) + strlen(node->host) + 5 +
174 strlen(node->db) + 1;
175 if ((uri = calloc(1, uri_length)) == NULL) {
176 ERROR("write_mongodb plugin: Not enough memory to assemble "
177 "authentication string.");
178 mongoc_client_destroy(node->client);
183 ssnprintf(uri, uri_length, format_string, node->user, node->passwd,
184 node->host, node->port, node->db);
186 node->client = mongoc_client_new(uri);
188 ERROR("write_mongodb plugin: Authenticating to [%s]%i for database "
189 "\"%s\" as user \"%s\" failed.",
190 (node->host != NULL) ? node->host : "localhost",
191 (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT, node->db,
198 format_string = "mongodb://%s:%d";
199 uri_length = strlen(format_string) + strlen(node->host) + 5 + 1;
200 if ((uri = calloc(1, uri_length)) == NULL) {
201 ERROR("write_mongodb plugin: Not enough memory to assemble "
202 "authentication string.");
203 mongoc_client_destroy(node->client);
208 snprintf(uri, uri_length, format_string, node->host, node->port);
210 node->client = mongoc_client_new(uri);
212 ERROR("write_mongodb plugin: Connecting to [%s]:%i failed.",
213 (node->host != NULL) ? node->host : "localhost",
214 (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT);
222 node->database = mongoc_client_get_database(node->client, "collectd");
223 if (!node->database) {
224 ERROR("write_mongodb plugin: error creating/getting database");
225 mongoc_client_destroy(node->client);
233 } /* }}} int wm_initialize */
235 static int wm_write(const data_set_t *ds, /* {{{ */
236 const value_list_t *vl, user_data_t *ud) {
237 wm_node_t *node = ud->data;
238 mongoc_collection_t *collection = NULL;
243 bson_record = wm_create_bson(ds, vl, node->store_rates);
245 ERROR("write_mongodb plugin: error making insert bson");
249 pthread_mutex_lock(&node->lock);
250 if (wm_initialize(node) < 0) {
251 ERROR("write_mongodb plugin: error making connection to server");
252 pthread_mutex_unlock(&node->lock);
253 bson_destroy(bson_record);
258 mongoc_client_get_collection(node->client, "collectd", vl->plugin);
260 ERROR("write_mongodb plugin: error creating/getting collection");
261 mongoc_database_destroy(node->database);
262 mongoc_client_destroy(node->client);
263 node->database = NULL;
266 pthread_mutex_unlock(&node->lock);
267 bson_destroy(bson_record);
271 status = mongoc_collection_insert(collection, MONGOC_INSERT_NONE, bson_record,
275 ERROR("write_mongodb plugin: error inserting record: %s", error.message);
276 mongoc_database_destroy(node->database);
277 mongoc_client_destroy(node->client);
278 node->database = NULL;
281 pthread_mutex_unlock(&node->lock);
282 bson_destroy(bson_record);
283 mongoc_collection_destroy(collection);
287 /* free our resource as not to leak memory */
288 mongoc_collection_destroy(collection);
290 pthread_mutex_unlock(&node->lock);
292 bson_destroy(bson_record);
295 } /* }}} int wm_write */
297 static void wm_config_free(void *ptr) /* {{{ */
299 wm_node_t *node = ptr;
304 mongoc_database_destroy(node->database);
305 mongoc_client_destroy(node->client);
306 node->database = NULL;
312 } /* }}} void wm_config_free */
314 static int wm_config_node(oconfig_item_t *ci) /* {{{ */
319 node = calloc(1, sizeof(*node));
324 node->store_rates = 1;
325 pthread_mutex_init(&node->lock, /* attr = */ NULL);
327 status = cf_util_get_string_buffer(ci, node->name, sizeof(node->name));
334 for (int i = 0; i < ci->children_num; i++) {
335 oconfig_item_t *child = ci->children + i;
337 if (strcasecmp("Host", child->key) == 0)
338 status = cf_util_get_string(child, &node->host);
339 else if (strcasecmp("Port", child->key) == 0) {
340 status = cf_util_get_port_number(child);
345 } else if (strcasecmp("Timeout", child->key) == 0)
346 status = cf_util_get_int(child, &node->timeout);
347 else if (strcasecmp("StoreRates", child->key) == 0)
348 status = cf_util_get_boolean(child, &node->store_rates);
349 else if (strcasecmp("Database", child->key) == 0)
350 status = cf_util_get_string(child, &node->db);
351 else if (strcasecmp("User", child->key) == 0)
352 status = cf_util_get_string(child, &node->user);
353 else if (strcasecmp("Password", child->key) == 0)
354 status = cf_util_get_string(child, &node->passwd);
356 WARNING("write_mongodb plugin: Ignoring unknown config option \"%s\".",
361 } /* for (i = 0; i < ci->children_num; i++) */
363 if ((node->db != NULL) || (node->user != NULL) || (node->passwd != NULL)) {
364 if ((node->db == NULL) || (node->user == NULL) || (node->passwd == NULL)) {
366 "write_mongodb plugin: Authentication requires the "
367 "\"Database\", \"User\" and \"Password\" options to be specified, "
368 "but at last one of them is missing. Authentication will NOT be "
377 char cb_name[DATA_MAX_NAME_LEN];
379 ssnprintf(cb_name, sizeof(cb_name), "write_mongodb/%s", node->name);
382 plugin_register_write(cb_name, wm_write,
384 .data = node, .free_func = wm_config_free,
386 INFO("write_mongodb plugin: registered write plugin %s %d", cb_name,
391 wm_config_free(node);
394 } /* }}} int wm_config_node */
396 static int wm_config(oconfig_item_t *ci) /* {{{ */
398 for (int i = 0; i < ci->children_num; i++) {
399 oconfig_item_t *child = ci->children + i;
401 if (strcasecmp("Node", child->key) == 0)
402 wm_config_node(child);
404 WARNING("write_mongodb plugin: Ignoring unknown "
405 "configuration option \"%s\" at top level.",
410 } /* }}} int wm_config */
412 void module_register(void) {
413 plugin_register_complex_config("write_mongodb", wm_config);