* "doubleValue": number,
* }
*/
-static int format_gcm_typed_value(yajl_gen gen, int ds_type,
- value_t v) /* {{{ */
-{
- char integer[21];
+static int format_typed_value(yajl_gen gen, int ds_type, value_t v,
+ int64_t start_value) {
+ char integer[32];
yajl_gen_map_open(gen);
- if (ds_type == DS_TYPE_GAUGE) {
+
+ switch (ds_type) {
+ case DS_TYPE_GAUGE: {
int status = json_string(gen, "doubleValue");
if (status != 0)
return status;
status = (int)yajl_gen_double(gen, (double)v.gauge);
if (status != yajl_gen_status_ok)
return status;
- } else {
- switch (ds_type) {
- case DS_TYPE_COUNTER:
- snprintf(integer, sizeof(integer), "%llu", v.counter);
- break;
- case DS_TYPE_DERIVE:
- snprintf(integer, sizeof(integer), "%" PRIi64, v.derive);
- break;
- case DS_TYPE_ABSOLUTE:
- snprintf(integer, sizeof(integer), "%" PRIu64, v.derive);
- break;
- default:
- ERROR("format_gcm_typed_value: unknown value type %d.", ds_type);
- return EINVAL;
- }
- int status = json_string(gen, "int64Value") || json_string(gen, integer);
- if (status != 0) {
- return status;
- }
+ yajl_gen_map_close(gen);
+ return 0;
+ }
+ case DS_TYPE_DERIVE: {
+ derive_t diff = v.derive - (derive_t)start_value;
+ snprintf(integer, sizeof(integer), "%" PRIi64, diff);
+ break;
+ }
+ case DS_TYPE_COUNTER: {
+ counter_t diff = counter_diff((counter_t)start_value, v.counter);
+ snprintf(integer, sizeof(integer), "%llu", diff);
+ break;
+ }
+ case DS_TYPE_ABSOLUTE: {
+ snprintf(integer, sizeof(integer), "%" PRIu64, v.absolute);
+ break;
+ }
+ default: {
+ ERROR("format_typed_value: unknown value type %d.", ds_type);
+ return EINVAL;
+ }
}
- yajl_gen_map_close(gen);
+ int status = json_string(gen, "int64Value") || json_string(gen, integer);
+ if (status != 0) {
+ return status;
+ }
+
+ yajl_gen_map_close(gen);
return 0;
-} /* }}} int format_gcm_typed_value */
+} /* }}} int format_typed_value */
/* MetricKind
*
* )
*/
static int format_metric_kind(yajl_gen gen, int ds_type) {
- return json_string(gen, (ds_type == DS_TYPE_GAUGE) ? "GAUGE" : "CUMULATIVE");
+ switch (ds_type) {
+ case DS_TYPE_GAUGE:
+ case DS_TYPE_ABSOLUTE:
+ return json_string(gen, "GAUGE");
+ case DS_TYPE_COUNTER:
+ case DS_TYPE_DERIVE:
+ return json_string(gen, "CUMULATIVE");
+ default:
+ ERROR("format_metric_kind: unknown value type %d.", ds_type);
+ return EINVAL;
+ }
}
/* ValueType
* }
*/
static int format_time_interval(yajl_gen gen, int ds_type,
- value_list_t const *vl) {
+ value_list_t const *vl, cdtime_t start_time) {
/* {{{ */
yajl_gen_map_open(gen);
if (status != 0)
return status;
- if (ds_type != DS_TYPE_GAUGE) {
- cdtime_t start_time = 0;
- if (uc_meta_data_get_unsigned_int(vl, "gcm:start_time", &start_time) != 0) {
- start_time = vl->time;
- uc_meta_data_add_unsigned_int(vl, "gcm:start_time", start_time);
- }
-
+ if ((ds_type == DS_TYPE_DERIVE) || (ds_type == DS_TYPE_COUNTER)) {
int status = json_string(gen, "startTime") || json_time(gen, start_time);
if (status != 0)
return status;
return 0;
} /* }}} int format_time_interval */
+/* read_cumulative_state reads the start time and start value of cumulative
+ * (i.e. DERIVE or COUNTER) metrics from the cache. If a metric is seen for the
+ * first time, or when a DERIVE metric is reset, the start time is (re)set to
+ * vl->time. */
+static int read_cumulative_state(data_set_t const *ds, value_list_t const *vl,
+ int ds_index, cdtime_t *ret_start_time,
+ int64_t *ret_start_value) {
+ int ds_type = ds->ds[ds_index].type;
+ if ((ds_type != DS_TYPE_DERIVE) && (ds_type != DS_TYPE_COUNTER)) {
+ return 0;
+ }
+
+ char start_value_key[DATA_MAX_NAME_LEN];
+ snprintf(start_value_key, sizeof(start_value_key),
+ "stackdriver:start_value[%d]", ds_index);
+
+ int status =
+ uc_meta_data_get_signed_int(vl, start_value_key, ret_start_value);
+ if ((status == 0) && ((ds_type != DS_TYPE_DERIVE) ||
+ (*ret_start_value <= vl->values[ds_index].derive))) {
+ return uc_meta_data_get_unsigned_int(vl, "stackdriver:start_time",
+ ret_start_time);
+ }
+
+ if (ds_type == DS_TYPE_DERIVE) {
+ *ret_start_value = vl->values[ds_index].derive;
+ } else {
+ *ret_start_value = (int64_t)vl->values[ds_index].counter;
+ }
+ *ret_start_time = vl->time;
+
+ status = uc_meta_data_add_signed_int(vl, start_value_key, *ret_start_value);
+ if (status != 0) {
+ return status;
+ }
+ return uc_meta_data_add_unsigned_int(vl, "stackdriver:start_time",
+ *ret_start_time);
+} /* int read_cumulative_state */
+
/* Point
*
* {
* }
*/
static int format_point(yajl_gen gen, data_set_t const *ds,
- value_list_t const *vl, int ds_index) {
+ value_list_t const *vl, int ds_index,
+ cdtime_t start_time, int64_t start_value) {
/* {{{ */
yajl_gen_map_open(gen);
int ds_type = ds->ds[ds_index].type;
- int status = json_string(gen, "interval") ||
- format_time_interval(gen, ds_type, vl) ||
- json_string(gen, "value") ||
- format_gcm_typed_value(gen, ds_type, vl->values[ds_index]);
+ int status =
+ json_string(gen, "interval") ||
+ format_time_interval(gen, ds_type, vl, start_time) ||
+ json_string(gen, "value") ||
+ format_typed_value(gen, ds_type, vl->values[ds_index], start_value);
if (status != 0)
return status;
* ],
* }
*/
+/* format_time_series formats a TimeSeries object. Returns EAGAIN when a
+ * cumulative metric is seen for the first time and cannot be sent to
+ * Stackdriver due to lack of state. */
static int format_time_series(yajl_gen gen, data_set_t const *ds,
value_list_t const *vl, int ds_index,
sd_resource_t *res) {
- /* {{{ */
- yajl_gen_map_open(gen);
-
int ds_type = ds->ds[ds_index].type;
+ cdtime_t start_time = 0;
+ int64_t start_value = 0;
int status =
- json_string(gen, "metric") || format_metric(gen, ds, vl, ds_index) ||
- json_string(gen, "resource") || format_gcm_resource(gen, res) ||
- json_string(gen, "metricKind") || format_metric_kind(gen, ds_type) ||
- json_string(gen, "valueType") || format_value_type(gen, ds_type) ||
- json_string(gen, "points");
+ read_cumulative_state(ds, vl, ds_index, &start_time, &start_value);
+ if (status != 0) {
+ return status;
+ }
+ if (start_time == vl->time) {
+ /* for cumulative metrics, the interval must not be zero. */
+ return EAGAIN;
+ }
+
+ yajl_gen_map_open(gen);
+
+ status = json_string(gen, "metric") || format_metric(gen, ds, vl, ds_index) ||
+ json_string(gen, "resource") || format_gcm_resource(gen, res) ||
+ json_string(gen, "metricKind") || format_metric_kind(gen, ds_type) ||
+ json_string(gen, "valueType") || format_value_type(gen, ds_type) ||
+ json_string(gen, "points");
if (status != 0)
return status;
yajl_gen_array_open(gen);
- if ((status = format_point(gen, ds, vl, ds_index)) != 0) {
+
+ status = format_point(gen, ds, vl, ds_index, start_time, start_value);
+ if (status != 0)
return status;
- }
- yajl_gen_array_close(gen);
+ yajl_gen_array_close(gen);
yajl_gen_map_close(gen);
return 0;
} /* }}} int format_time_series */
return EEXIST;
}
+ _Bool staged = 0;
for (size_t i = 0; i < ds->ds_num; i++) {
int status = format_time_series(out->gen, ds, vl, i, out->res);
+ if (status == EAGAIN) {
+ /* first instance of a cumulative metric */
+ continue;
+ }
if (status != 0) {
ERROR("sd_output_add: format_time_series failed with status %d.", status);
return status;
}
+ staged = 1;
}
- c_avl_insert(out->staged, strdup(key), NULL);
+ if (staged) {
+ c_avl_insert(out->staged, strdup(key), NULL);
+ }
size_t json_buffer_size = 0;
yajl_gen_get_buf(out->gen, &(unsigned char const *){NULL}, &json_buffer_size);