--- /dev/null
+GOROOT = /usr/lib/go
+
+include $(GOROOT)/src/Make.inc
+
+TARG = ots_timeseries
+GOFILES = ots_timeseries.go\
+
+include $(GOROOT)/src/Make.pkg
+++ /dev/null
-package main
-
-import (
- "fmt"
- "os"
-)
-
-type OTS_DataPoint struct {
- TimeStamp float64
- Rate float64
-}
-
-type OTS_Data struct {
- TSData []OTS_DataPoint
-}
-
-/* Functions for the sort interface. */
-func (obj *OTS_Data) Len () int {
- return (len (obj.TSData))
-}
-
-func (obj *OTS_Data) Less (i, j int) bool {
- if obj.TSData[i].TimeStamp < obj.TSData[j].TimeStamp {
- return true
- }
- return false
-}
-
-func (obj *OTS_Data) Swap (i, j int) {
- tmp := obj.TSData[i]
- obj.TSData[i] = obj.TSData[j]
- obj.TSData[j] = tmp
-}
-
-func Fmod64 (a float64, b float64) float64 {
- tmp := int (a / b)
- return b * float64 (tmp)
-}
-
-func (obj *OTS_Data) Write (name string) os.Error {
- fd, err := os.OpenFile(name, os.O_WRONLY, 0666)
- if err != nil {
- return err
- }
-
- for i := 0; i < len (obj.TSData); i++ {
- data_point := obj.TSData[i]
- str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate)
-
- fd.WriteString (str)
- }
-
- fd.Close ()
- return nil
-}
-
-func ReadFile (name string) (obj *OTS_Data, err os.Error) {
-}
-
-func (raw_data *OTS_Data) Consolidate (interval float64) *OTS_Data {
- if interval <= 0.0 {
- return nil
- }
-
- ts_raw_first := raw_data.TSData[0].TimeStamp
- ts_raw_last := ts_raw_first
-
- /* Determine the first and last data point.
- * XXX: In the future, this should be a sorted list! */
- for i := 1; i < len (raw_data.TSData); i++ {
- data_point := raw_data.TSData[i]
-
- if ts_raw_first > data_point.TimeStamp {
- ts_raw_first = data_point.TimeStamp
- }
-
- if ts_raw_last < data_point.TimeStamp {
- ts_raw_last = data_point.TimeStamp
- }
- }
-
- fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n",
- ts_raw_first, ts_raw_last)
-
- /* Determine the timespan the consolidated data will span. */
- ts_csl_first := Fmod64 (ts_raw_first, interval)
- ts_csl_last := Fmod64 (ts_raw_last, interval)
- if ts_csl_last < ts_raw_last {
- ts_csl_last += interval
- }
-
- fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n",
- ts_csl_first, ts_csl_last)
-
- intervals_num := int ((ts_csl_last - ts_csl_first) / interval)
- fmt.Printf ("Got a %gs timespan (%d intervals).\n",
- ts_csl_last - ts_csl_first, intervals_num)
-
- /* Allocate return structure */
- ret_data := new (OTS_Data)
- ret_data.TSData = make ([]OTS_DataPoint, intervals_num)
-
- /* FIXME: This is currently a O(n^2) algorithm. It should instead be a O(n)
- * algorithm. This is possible if raw_data is sorted (which, obviously, is a
- * O(n log(n)) task). */
- for i := 0; i < intervals_num; i++ {
- ts := ts_csl_first + (float64 (i) * interval)
- sum := 0.0
- num := 0.0
-
- fmt.Printf ("Building data for interval %g.\n", ts)
-
- ret_data.TSData[i].TimeStamp = ts
-
- for j := 0; j < len (raw_data.TSData); j++ {
- data_point := raw_data.TSData[j]
-
- if ((data_point.TimeStamp < ts) || (data_point.TimeStamp >= (ts + interval))) {
- continue
- }
-
- sum += data_point.Rate
- num += 1.0
- }
-
- /* TODO: Be more clever about how this consolidated rate is computed. */
- if num > 0.0 {
- ret_data.TSData[i].Rate = sum / num
- }
- }
-
- return ret_data
-}
-
-func (obj *OTS_Data) Print () {
- for i := 0; i < len (obj.TSData); i++ {
- data_point := obj.TSData[i]
- fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate)
- }
-} /* Print () */
-
-func main () {
- var data_points []OTS_DataPoint
- var raw_data *OTS_Data
- var new_data *OTS_Data
-
- data_points = []OTS_DataPoint {
- {0.0, 1.0},
- {1.0, 2.0},
- {2.0, 5.0},
- {3.0, 8.0},
- {4.0, 0.0},
- {5.0, 3.0}}
-
- raw_data = new (OTS_Data)
- raw_data.TSData = data_points
-
- new_data = raw_data.Consolidate (2.0)
-
- new_data.Print()
-}
--- /dev/null
+package otsdb
+
+import (
+ "fmt"
+ "math"
+ "os"
+ "sort"
+)
+
+type OTS_DataPoint struct {
+ TimeStamp float64
+ Rate float64
+}
+
+type OTS_TimeSeries struct {
+ DataPoints []OTS_DataPoint
+}
+
+/* Functions for the sort interface. */
+func (obj *OTS_TimeSeries) Len () int {
+ return (len (obj.DataPoints))
+}
+
+func (obj *OTS_TimeSeries) Less (i, j int) bool {
+ if obj.DataPoints[i].TimeStamp < obj.DataPoints[j].TimeStamp {
+ return true
+ }
+ return false
+}
+
+func (obj *OTS_TimeSeries) Swap (i, j int) {
+ tmp := obj.DataPoints[i]
+ obj.DataPoints[i] = obj.DataPoints[j]
+ obj.DataPoints[j] = tmp
+}
+
+func Fmod64 (a float64, b float64) float64 {
+ tmp := int (a / b)
+ return b * float64 (tmp)
+}
+
+func (obj *OTS_TimeSeries) Write (name string) os.Error {
+ fd, err := os.OpenFile (name, os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+
+ for i := 0; i < len (obj.DataPoints); i++ {
+ data_point := obj.DataPoints[i]
+ str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate)
+
+ fd.WriteString (str)
+ }
+
+ fd.Close ()
+ return nil
+}
+
+func ReadFile (name string) (obj *OTS_TimeSeries, err os.Error) {
+ fd, err := os.Open (name)
+ if err != nil {
+ return nil, err
+ }
+
+ /* dp_list := make ([]OTS_DataPoint, intervals_num */
+ obj = new (OTS_TimeSeries)
+
+ for ;; {
+ var timestamp float64
+ var rate float64
+
+ status, err := fmt.Fscanln (fd, "%f,%f", ×tamp, &rate)
+ if err != nil {
+ break
+ } else if status != 2 {
+ continue
+ }
+
+ fmt.Printf ("timestamp = %.3f; rate = %g;\n", timestamp, rate)
+
+ obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate})
+ }
+
+ fd.Close ()
+ return obj, nil
+}
+
+func (obj *OTS_TimeSeries) ConsolidatePointAverage (ts_start, ts_end float64) OTS_DataPoint {
+ var dp OTS_DataPoint
+
+ if ts_start > ts_end {
+ tmp := ts_end
+ ts_end = ts_start
+ ts_start = tmp
+ }
+
+ dp.TimeStamp = ts_end
+ dp.Rate = math.NaN ()
+
+ if len (obj.DataPoints) < 1 {
+ /* The object contains no data. */
+ return dp
+ } else if ts_start > obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp {
+ /* The timespan is after all the data in the object. */
+ return dp
+ } else if ts_end < obj.DataPoints[0].TimeStamp {
+ /* The timespan is before all the data in the object. */
+ return dp
+ }
+
+ /* Find the first rate _after_ the start of the interval. */
+ idx_start := sort.Search (len (obj.DataPoints), func (i int) bool {
+ if obj.DataPoints[i].TimeStamp > ts_start {
+ return true
+ }
+ return false
+ })
+
+ /* The start is outside of the range of the timestamp. With the above checks
+ * this means that the start is _before_ the data in the object. We can thus
+ * use the first elements in the slice. */
+ if idx_start >= len (obj.DataPoints) {
+ idx_start = 0
+ }
+
+ /* There is no data points _between_ ts_start and ts_end. Return the first
+ * measured rate _after_ the desired timespan as the rate of the timespan. */
+ if obj.DataPoints[idx_start].TimeStamp >= ts_end {
+ dp.Rate = obj.DataPoints[idx_start].Rate
+ return dp
+ }
+
+ var timespan_len float64 = 0.0
+ var timespan_sum float64 = 0.0
+ for i := idx_start; i < len (obj.DataPoints); i++ {
+ dp_ts_start := ts_start
+ if (i > 0) && (dp_ts_start < obj.DataPoints[i - 1].TimeStamp) {
+ dp_ts_start = obj.DataPoints[i - 1].TimeStamp
+ }
+
+ dp_ts_end := obj.DataPoints[i].TimeStamp
+ if dp_ts_end > ts_end {
+ dp_ts_end = ts_end
+ }
+
+ dp_ts_diff := dp_ts_end - dp_ts_start
+ /* assert dp_ts_diff > 0.0 */
+ timespan_len += dp_ts_diff
+ timespan_sum += dp_ts_diff * obj.DataPoints[i].Rate
+
+ if obj.DataPoints[i].TimeStamp >= ts_end {
+ break;
+ }
+ } /* for i */
+
+ dp.Rate = timespan_sum / timespan_len
+ return dp
+} /* ConsolidatePointAverage */
+
+func (obj *OTS_TimeSeries) ConsolidateAverage (interval float64) *OTS_TimeSeries {
+ if interval <= 0.0 {
+ return nil
+ }
+
+ ts_raw_first := obj.DataPoints[0].TimeStamp
+ ts_raw_last := obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp
+
+ fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n",
+ ts_raw_first, ts_raw_last)
+
+ /* Determine the timespan the consolidated data will span. */
+ ts_csl_first := Fmod64 (ts_raw_first, interval)
+ ts_csl_last := Fmod64 (ts_raw_last, interval)
+ if ts_csl_first < ts_raw_first {
+ ts_csl_first += interval
+ }
+
+ fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n",
+ ts_csl_first, ts_csl_last)
+
+ intervals_num := int ((ts_csl_last - ts_csl_first) / interval)
+ fmt.Printf ("Got a %gs timespan (%d intervals).\n",
+ ts_csl_last - ts_csl_first, intervals_num)
+
+ /* Allocate return structure */
+ ret_data := new (OTS_TimeSeries)
+ ret_data.DataPoints = make ([]OTS_DataPoint, intervals_num)
+
+ /* FIXME: This is currently a O(n log(n)) algorithm. It should instead be a O(n)
+ * algorithm. This is possible since obj is sorted. The problem is that
+ * ConsolidatePointAverage() does a binary search when we actually know where
+ * to go in the array. */
+ for i := 0; i < intervals_num; i++ {
+ ts := ts_csl_first + (float64 (i + 1) * interval)
+
+ fmt.Printf ("Building data for interval ]%g-%g].\n", ts - interval, ts)
+
+ ret_data.DataPoints[i] = obj.ConsolidatePointAverage (ts - interval, ts)
+ }
+
+ return ret_data
+} /* ConsolidateAverage */
+
+func (obj *OTS_TimeSeries) Print () {
+ for i := 0; i < len (obj.DataPoints); i++ {
+ data_point := obj.DataPoints[i]
+ fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate)
+ }
+} /* Print () */
+
+/* vim: set syntax=go sw=2 sts=2 et : */
--- /dev/null
+package otsdb
+
+import (
+ "math"
+ "testing"
+)
+
+type consolidatePointAverageTest struct {
+ tsStart float64
+ tsEnd float64
+ rate float64
+}
+
+var consolidatePointAverageTestData = []OTS_DataPoint {
+ { 0.0, 0.0},
+ { 10.0, 2.0},
+ { 20.0, 4.0},
+ { 30.0, 8.0},
+ { 40.0, 16.0},
+ { 50.0, 32.0},
+ { 60.0, 64.0},
+ { 70.0, 96.0},
+ { 80.0, 96.0},
+ { 90.0, 0.0},
+ {100.0, 0.0},
+ {110.0, 0.0},
+ {120.0, 0.0},
+}
+
+var consolidatePointAverageTests = []consolidatePointAverageTest {
+ /* Timespan borders align with datapoints. This is the easiest case. */
+ consolidatePointAverageTest{40.0, 60.0, 48.0},
+ consolidatePointAverageTest{40.0, 50.0, 32.0},
+ /* Timespan borders between datapoints. */
+ consolidatePointAverageTest{35.0, 45.0, 24.0},
+ consolidatePointAverageTest{ 7.0, 27.0, 5.1},
+ consolidatePointAverageTest{17.0, 42.0, 12.64},
+ /* No datapoints within timespan. */
+ consolidatePointAverageTest{23.0, 28.0, 8.0},
+ /* Beginning before first datapoint */
+ consolidatePointAverageTest{-8.0, 24.0, 2.875},
+ /* End after last datapoint */
+ consolidatePointAverageTest{60.0, 180.0, 32.0},
+ /* Start and end inversed */
+ consolidatePointAverageTest{27.0, 7.0, 5.1},
+}
+
+func FloatsDiffer (a, b float64) bool {
+ if math.Fabs (a - b) > 1.0e-6 {
+ return true
+ }
+ return false
+}
+
+func TestConsolidatePointAverage (t *testing.T) {
+ obj := new (OTS_TimeSeries)
+ obj.DataPoints = consolidatePointAverageTestData
+
+ for i := 0; i < len (consolidatePointAverageTests); i++ {
+ testCase := consolidatePointAverageTests[i]
+
+ dp := obj.ConsolidatePointAverage (testCase.tsStart, testCase.tsEnd);
+ if FloatsDiffer (dp.Rate, testCase.rate) {
+ t.Errorf ("ConsolidatePointAverage (%g, %g) failed: Expected %g, got %g",
+ testCase.tsStart, testCase.tsEnd, testCase.rate, dp.Rate);
+ }
+ }
+}
+
+/* vim: set syntax=go sw=2 sts=2 et : */