From 9dad090759fe0f7137262f26498b6c1c3dfb8c29 Mon Sep 17 00:00:00 2001 From: Taylor Cramer Date: Mon, 24 Jul 2017 10:36:27 -0700 Subject: [PATCH] Add metadata support to GRPC plugin --- proto/types.proto | 13 ++++- src/daemon/utils_cache.c | 14 +++++ src/daemon/utils_cache.h | 2 + src/grpc.cc | 130 +++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 154 insertions(+), 5 deletions(-) diff --git a/proto/types.proto b/proto/types.proto index 952c5418..fde3afaf 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -38,6 +38,16 @@ message Identifier { string type_instance = 5; } +message MetadataValue { + oneof value { + string string_value = 1; + int64 int64_value = 2; + uint64 uint64_value = 3; + double double_value = 4; + bool bool_value = 5; + }; +} + message Value { oneof value { uint64 counter = 1; @@ -56,4 +66,5 @@ message ValueList { Identifier identifier = 4; repeated string ds_names = 5; -} + map meta_data = 6; +} \ No newline at end of file diff --git a/src/daemon/utils_cache.c b/src/daemon/utils_cache.c index 7d6e8fd6..3534c3d5 100644 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@ -913,6 +913,20 @@ int uc_iterator_get_interval(uc_iter_t *iter, cdtime_t *ret_interval) { return 0; } /* int uc_iterator_get_name */ +int uc_iterator_get_meta(uc_iter_t *iter, meta_data_t **ret_meta) { + if ((iter == NULL) || (iter->entry == NULL) || (ret_meta == NULL)) + return -1; + + if (iter->entry->meta == NULL) { + *ret_meta = NULL; + return 0; + } + + *ret_meta = meta_data_clone(iter->entry->meta); + + return 0; +} /* int uc_iterator_get_meta */ + /* * Meta data interface */ diff --git a/src/daemon/utils_cache.h b/src/daemon/utils_cache.h index 8ba7133a..08c2f10a 100644 --- a/src/daemon/utils_cache.h +++ b/src/daemon/utils_cache.h @@ -106,6 +106,8 @@ int uc_iterator_get_values(uc_iter_t *iter, value_t **ret_values, size_t *ret_num); /* Return the interval of the value at the current position. */ int uc_iterator_get_interval(uc_iter_t *iter, cdtime_t *ret_interval); +/* Return the metadata for the value at the current position. */ +int uc_iterator_get_meta(uc_iter_t *iter, meta_data_t **ret_meta); /* * Meta data interface diff --git a/src/grpc.cc b/src/grpc.cc index 2f16dbcb..d465ffe0 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -170,9 +170,87 @@ static grpc::Status marshal_value_list(const value_list_t *vl, msg->set_allocated_time(new google::protobuf::Timestamp(t)); msg->set_allocated_interval(new google::protobuf::Duration(d)); + msg->clear_meta_data(); + if (vl->meta != nullptr) { + char **meta_data_keys = nullptr; + int meta_data_keys_len = meta_data_toc(vl->meta, &meta_data_keys); + if (meta_data_keys_len < 0) { + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("error getting metadata keys")); + } + + for (int i = 0; i < meta_data_keys_len; i++) { + char *key = meta_data_keys[i]; + int md_type = meta_data_type(vl->meta, key); + + collectd::types::MetadataValue md_value; + md_value.Clear(); + + switch (md_type) { + case MD_TYPE_STRING: + char *md_string; + if (meta_data_get_string(vl->meta, key, &md_string) != 0 || md_string == nullptr) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_string_value(md_string); + free(md_string); + break; + case MD_TYPE_SIGNED_INT: + int64_t int64_value; + if (meta_data_get_signed_int(vl->meta, key, &int64_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_int64_value(int64_value); + break; + case MD_TYPE_UNSIGNED_INT: + uint64_t uint64_value; + if (meta_data_get_unsigned_int(vl->meta, key, &uint64_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_uint64_value(uint64_value); + break; + case MD_TYPE_DOUBLE: + double double_value; + if (meta_data_get_double(vl->meta, key, &double_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_double_value(double_value); + break; + case MD_TYPE_BOOLEAN: + bool bool_value; + if (meta_data_get_boolean(vl->meta, key, &bool_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_bool_value(bool_value); + break; + default: + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("unknown metadata type")); + } + + (*msg->mutable_meta_data())[grpc::string(key)] = md_value; + + if (meta_data_keys != nullptr) { + strarray_free(meta_data_keys, meta_data_keys_len); + } + } + } + for (size_t i = 0; i < vl->values_len; ++i) { auto v = msg->add_values(); - switch (ds->ds[i].type) { + int value_type = ds->ds[i].type; + switch (value_type) { case DS_TYPE_COUNTER: v->set_counter(vl->values[i].counter); break; @@ -186,6 +264,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl, v->set_absolute(vl->values[i].absolute); break; default: + ERROR("grpc: invalid value type (%d)", value_type); return grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("unknown value type")); } @@ -207,6 +286,42 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, if (!status.ok()) return status; + vl->meta = meta_data_create(); + if (vl->meta == nullptr) { + return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + grpc::string("failed to metadata list")); + } + for (auto kv: msg.meta_data()) { + auto k = kv.first.c_str(); + auto v = kv.second; + + // The meta_data collection individually allocates copies of the keys and + // string values for each entry, so it's safe for us to pass a reference + // to our short-lived strings. + + switch (v.value_case()) { + case collectd::types::MetadataValue::ValueCase::kStringValue: + meta_data_add_string(vl->meta, k, v.string_value().c_str()); + break; + case collectd::types::MetadataValue::ValueCase::kInt64Value: + meta_data_add_signed_int(vl->meta, k, v.int64_value()); + break; + case collectd::types::MetadataValue::ValueCase::kUint64Value: + meta_data_add_unsigned_int(vl->meta, k, v.uint64_value()); + break; + case collectd::types::MetadataValue::ValueCase::kDoubleValue: + meta_data_add_double(vl->meta, k, v.double_value()); + break; + case collectd::types::MetadataValue::ValueCase::kBoolValue: + meta_data_add_boolean(vl->meta, k, v.bool_value()); + break; + default: + meta_data_destroy(vl->meta); + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + grpc::string("Metadata of unknown type")); + } + } + value_t *values = NULL; size_t values_len = 0; @@ -249,8 +364,11 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, if (status.ok()) { vl->values = values; vl->values_len = values_len; - } else if (values) { - free(values); + } else { + meta_data_destroy(vl->meta); + if (values) { + free(values); + } } return status; @@ -280,6 +398,7 @@ public: auto vl = value_lists.front(); value_lists.pop(); sfree(vl.values); + meta_data_destroy(vl.meta); } return status; @@ -328,7 +447,6 @@ private: if (!ident_matches(&vl, match)) continue; - if (uc_iterator_get_time(iter, &vl.time) < 0) { status = grpc::Status(grpc::StatusCode::INTERNAL, @@ -346,6 +464,10 @@ private: grpc::string("failed to retrieve values")); break; } + if (uc_iterator_get_meta(iter, &vl.meta) < 0) { + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve value metadata")); + } value_lists->push(vl); } // while (uc_iterator_next(iter, &name) == 0) -- 2.11.0