2 * collectd - src/write_mongodb.c
3 * Copyright (C) 2010-2013 Florian Forster
4 * Copyright (C) 2010 Akkarit Sangpetch
5 * Copyright (C) 2012 Chris Lundquist
6 * Copyright (C) 2017 Saikrishna Arcot
8 * Permission is hereby granted, free of charge, to any person obtaining a
9 * copy of this software and associated documentation files (the "Software"),
10 * to deal in the Software without restriction, including without limitation
11 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 * and/or sell copies of the Software, and to permit persons to whom the
13 * Software is furnished to do so, subject to the following conditions:
15 * The above copyright notice and this permission notice shall be included in
16 * all copies or substantial portions of the Software.
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 * DEALINGS IN THE SOFTWARE.
27 * Florian Forster <octo at collectd.org>
28 * Akkarit Sangpetch <asangpet at andrew.cmu.edu>
29 * Chris Lundquist <clundquist at bluebox.net>
30 * Saikrishna Arcot <saiarcot895 at gmail.com>
37 #include "utils_cache.h"
42 char name[DATA_MAX_NAME_LEN];
48 /* Authentication information */
56 mongoc_client_t *client;
57 mongoc_database_t *database;
60 typedef struct wm_node_s wm_node_t;
65 static bson_t *wm_create_bson(const data_set_t *ds, /* {{{ */
66 const value_list_t *vl, _Bool store_rates) {
73 ERROR("write_mongodb plugin: bson_new failed.");
78 rates = uc_get_rate(ds, vl);
80 ERROR("write_mongodb plugin: uc_get_rate() failed.");
88 BSON_APPEND_DATE_TIME(ret, "timestamp", CDTIME_T_TO_MS(vl->time));
89 BSON_APPEND_UTF8(ret, "host", vl->host);
90 BSON_APPEND_UTF8(ret, "plugin", vl->plugin);
91 BSON_APPEND_UTF8(ret, "plugin_instance", vl->plugin_instance);
92 BSON_APPEND_UTF8(ret, "type", vl->type);
93 BSON_APPEND_UTF8(ret, "type_instance", vl->type_instance);
95 BSON_APPEND_ARRAY_BEGIN(ret, "values", &subarray); /* {{{ */
96 for (int i = 0; i < ds->ds_num; i++) {
99 ssnprintf(key, sizeof(key), "%i", i);
101 if (ds->ds[i].type == DS_TYPE_GAUGE)
102 BSON_APPEND_DOUBLE(&subarray, key, vl->values[i].gauge);
103 else if (store_rates)
104 BSON_APPEND_DOUBLE(&subarray, key, (double)rates[i]);
105 else if (ds->ds[i].type == DS_TYPE_COUNTER)
106 BSON_APPEND_INT64(&subarray, key, vl->values[i].counter);
107 else if (ds->ds[i].type == DS_TYPE_DERIVE)
108 BSON_APPEND_INT64(&subarray, key, vl->values[i].derive);
109 else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
110 BSON_APPEND_INT64(&subarray, key, vl->values[i].absolute);
112 ERROR("write_mongodb plugin: Unknown ds_type %d for index %d",
118 bson_append_array_end(ret, &subarray); /* }}} values */
120 BSON_APPEND_ARRAY_BEGIN(ret, "dstypes", &subarray); /* {{{ */
121 for (int i = 0; i < ds->ds_num; i++) {
124 ssnprintf(key, sizeof(key), "%i", i);
127 BSON_APPEND_UTF8(&subarray, key, "gauge");
129 BSON_APPEND_UTF8(&subarray, key, DS_TYPE_TO_STRING(ds->ds[i].type));
131 bson_append_array_end(ret, &subarray); /* }}} dstypes */
133 BSON_APPEND_ARRAY_BEGIN(ret, "dsnames", &subarray); /* {{{ */
134 for (int i = 0; i < ds->ds_num; i++) {
137 ssnprintf(key, sizeof(key), "%i", i);
138 BSON_APPEND_UTF8(&subarray, key, ds->ds[i].name);
140 bson_append_array_end(ret, &subarray); /* }}} dsnames */
144 size_t error_location;
145 if (!bson_validate(ret, BSON_VALIDATE_UTF8, &error_location)) {
146 ERROR("write_mongodb plugin: Error in generated BSON document "
147 "at byte %zu", error_location);
153 } /* }}} bson *wm_create_bson */
155 static int wm_initialize(wm_node_t *node) /* {{{ */
159 char const *format_string;
161 if (node->connected) {
165 INFO("write_mongodb plugin: Connecting to [%s]:%i",
166 (node->host != NULL) ? node->host : "localhost",
167 (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT);
169 if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) {
170 format_string = "mongodb://%s:%s@%s:%d/?authSource=%s";
171 uri_length = strlen(format_string) + strlen(node->user) +
172 strlen(node->passwd) + strlen(node->host) + 5 +
173 strlen(node->db) + 1;
174 if ((uri = calloc(sizeof(char), uri_length)) == NULL) {
175 ERROR("write_mongodb plugin: Not enough memory to assemble "
176 "authentication string.");
177 mongoc_client_destroy(node->client);
182 ssnprintf(uri, uri_length, format_string, node->user, node->passwd,
183 node->host, node->port, node->db);
185 node->client = mongoc_client_new(uri);
187 ERROR("write_mongodb plugin: Authenticating to [%s]%i for database "
188 "\"%s\" as user \"%s\" failed.",
189 (node->host != NULL) ? node->host : "localhost",
190 (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT, node->db,
197 format_string = "mongodb://%s:%d";
198 uri_length = strlen(format_string) + strlen(node->host) + 5 + 1;
199 if ((uri = calloc(sizeof(char), uri_length)) == NULL) {
200 ERROR("write_mongodb plugin: Not enough memory to assemble "
201 "authentication string.");
202 mongoc_client_destroy(node->client);
207 snprintf(uri, uri_length, format_string, node->host, node->port);
209 node->client = mongoc_client_new(uri);
211 ERROR("write_mongodb plugin: Connecting to [%s]:%i failed.",
212 (node->host != NULL) ? node->host : "localhost",
213 (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT);
221 node->database = mongoc_client_get_database(node->client, "collectd");
222 if (!node->database) {
223 ERROR("write_mongodb plugin: error creating/getting database");
224 mongoc_client_destroy(node->client);
232 } /* }}} int wm_initialize */
234 static int wm_write(const data_set_t *ds, /* {{{ */
235 const value_list_t *vl, user_data_t *ud) {
236 wm_node_t *node = ud->data;
237 mongoc_collection_t *collection = NULL;
242 bson_record = wm_create_bson(ds, vl, node->store_rates);
244 ERROR("write_mongodb plugin: error making insert bson");
248 pthread_mutex_lock(&node->lock);
249 if (wm_initialize(node) < 0) {
250 ERROR("write_mongodb plugin: error making connection to server");
251 pthread_mutex_unlock(&node->lock);
252 bson_free(bson_record);
257 mongoc_client_get_collection(node->client, "collectd", vl->plugin);
259 ERROR("write_mongodb plugin: error creating/getting collection");
260 mongoc_database_destroy(node->database);
261 mongoc_client_destroy(node->client);
262 node->database = NULL;
265 pthread_mutex_unlock(&node->lock);
266 bson_free(bson_record);
270 status = mongoc_collection_insert(collection, MONGOC_INSERT_NONE, bson_record,
274 ERROR("write_mongodb plugin: error inserting record: %s", error.message);
275 mongoc_database_destroy(node->database);
276 mongoc_client_destroy(node->client);
277 node->database = NULL;
280 pthread_mutex_unlock(&node->lock);
281 bson_free(bson_record);
282 mongoc_collection_destroy(collection);
286 /* free our resource as not to leak memory */
287 mongoc_collection_destroy(collection);
289 pthread_mutex_unlock(&node->lock);
291 bson_free(bson_record);
294 } /* }}} int wm_write */
296 static void wm_config_free(void *ptr) /* {{{ */
298 wm_node_t *node = ptr;
303 mongoc_database_destroy(node->database);
304 mongoc_client_destroy(node->client);
305 node->database = NULL;
311 } /* }}} void wm_config_free */
313 static int wm_config_node(oconfig_item_t *ci) /* {{{ */
318 node = calloc(1, sizeof(*node));
323 node->store_rates = 1;
324 pthread_mutex_init(&node->lock, /* attr = */ NULL);
326 status = cf_util_get_string_buffer(ci, node->name, sizeof(node->name));
333 for (int i = 0; i < ci->children_num; i++) {
334 oconfig_item_t *child = ci->children + i;
336 if (strcasecmp("Host", child->key) == 0)
337 status = cf_util_get_string(child, &node->host);
338 else if (strcasecmp("Port", child->key) == 0) {
339 status = cf_util_get_port_number(child);
344 } else if (strcasecmp("Timeout", child->key) == 0)
345 status = cf_util_get_int(child, &node->timeout);
346 else if (strcasecmp("StoreRates", child->key) == 0)
347 status = cf_util_get_boolean(child, &node->store_rates);
348 else if (strcasecmp("Database", child->key) == 0)
349 status = cf_util_get_string(child, &node->db);
350 else if (strcasecmp("User", child->key) == 0)
351 status = cf_util_get_string(child, &node->user);
352 else if (strcasecmp("Password", child->key) == 0)
353 status = cf_util_get_string(child, &node->passwd);
355 WARNING("write_mongodb plugin: Ignoring unknown config option \"%s\".",
360 } /* for (i = 0; i < ci->children_num; i++) */
362 if ((node->db != NULL) || (node->user != NULL) || (node->passwd != NULL)) {
363 if ((node->db == NULL) || (node->user == NULL) || (node->passwd == NULL)) {
365 "write_mongodb plugin: Authentication requires the "
366 "\"Database\", \"User\" and \"Password\" options to be specified, "
367 "but at last one of them is missing. Authentication will NOT be "
376 char cb_name[DATA_MAX_NAME_LEN];
378 ssnprintf(cb_name, sizeof(cb_name), "write_mongodb/%s", node->name);
380 status = plugin_register_write(
381 cb_name, wm_write, &(user_data_t){
382 .data = node, .free_func = wm_config_free,
384 INFO("write_mongodb plugin: registered write plugin %s %d", cb_name,
389 wm_config_free(node);
392 } /* }}} int wm_config_node */
394 static int wm_config(oconfig_item_t *ci) /* {{{ */
396 for (int i = 0; i < ci->children_num; i++) {
397 oconfig_item_t *child = ci->children + i;
399 if (strcasecmp("Node", child->key) == 0)
400 wm_config_node(child);
402 WARNING("write_mongodb plugin: Ignoring unknown "
403 "configuration option \"%s\" at top level.",
408 } /* }}} int wm_config */
410 void module_register(void) {
411 plugin_register_complex_config("write_mongodb", wm_config);