Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add cardinality stat to each node type. #1221

Merged
merged 37 commits into from
Mar 6, 2017
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
97b969a
Add IntFuncGauge to expvar package
desa Feb 22, 2017
f6c3639
Add cardinality stat to AlertNode
desa Feb 22, 2017
fe62376
Add cardinality stat to WindowNode
desa Feb 22, 2017
0e6a4b5
Add cardinality stat to CombineNode
desa Feb 22, 2017
3d99a26
Add cardinality stat to DerivativeNode
desa Feb 22, 2017
09bf19d
Add cardinality count to EvalNode
desa Feb 22, 2017
f3fb481
Add cardinality stat to FlattenNode
desa Feb 22, 2017
bc447fc
Add cardinality count to HTTPOutNode
desa Feb 22, 2017
d677cbf
Add cardinality stat for streaming InfluxQL node
desa Feb 23, 2017
964fe7a
Add cardinality stat to join node
desa Feb 23, 2017
ad96ebd
Add cardinality stat to K8sAutoscale node
desa Feb 23, 2017
90ef5f4
Add cardinality stat to SampleNode
desa Feb 23, 2017
a8d27c9
Add cardinality stat for WhereNode
desa Feb 23, 2017
06a96da
Change IntFuncGauge type signature to use int64
desa Feb 23, 2017
bb3c2e5
Define nodeCardinality stat on all nodes
desa Feb 23, 2017
a4bbdb9
Fix bug that reassigned the nodeCardinality
desa Feb 24, 2017
bf4ee90
Change cardinality stat to working_cardinality
desa Feb 27, 2017
005f88b
Add testStreamerCardinality function
desa Feb 27, 2017
a61965e
Add derivative cardinality test
desa Feb 27, 2017
b7a5bf5
Add cardinality stat to group by node
desa Feb 27, 2017
359fe04
Add cardinality tests for various node types
desa Feb 27, 2017
d169a1c
Fix clock issue with running task in test
desa Feb 27, 2017
abbf09a
Add cardinality tests
desa Feb 27, 2017
0c1d7a4
Add working_cardinality to CHANGELOG
desa Feb 27, 2017
a50e721
Fix data race in setting nodeCardinality.ValueF
desa Feb 27, 2017
3b815bc
Fix k8sAutoscale cardinality test
desa Feb 27, 2017
ed190fa
Remove nodeCardinality and mutex from node struct
desa Feb 28, 2017
55840df
Remove nodeCardinality from node struct
desa Mar 2, 2017
8526f28
Add a getAlertState method that looks up the state
desa Mar 2, 2017
381955d
Change cardinalityMu to statesMu on alert node
desa Mar 2, 2017
b6a7a5e
Change cardinalityMu to expressionsByGroupMu
desa Mar 2, 2017
b2edeb1
Change cardinalityMu to expressionsByGroupMu in ev
desa Mar 2, 2017
7b60c8c
Change cardinalityMu to groupsMu in join node
desa Mar 2, 2017
69ee032
Change cardinalityMu to replicasExprsMu in k8s node
desa Mar 2, 2017
f97f8e4
Change cardinalityMu to countsMu in sample node
desa Mar 2, 2017
32d1502
Change RLock to a Lock in shouldKeep in sample node
desa Mar 2, 2017
e9c0658
Fold getAlertState into conditional
desa Mar 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 174 additions & 7 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
@@ -8780,7 +8780,7 @@ stream
testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func TestStream_InfluxQLCardinality(t *testing.T) {
func TestStream_InfluxQLCardinalityStream(t *testing.T) {

var script = `
stream
@@ -8819,6 +8819,55 @@ stream
testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func TestStream_InfluxQLCardinalityBatch(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
.groupBy('host','cpu')
|window()
.period(1s)
.every(1s)
|max('usage_user')
.as('max')
`

// Expected Stats
es := map[string]map[string]interface{}{
"stream0": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(0),
"collected": int64(90),
"emitted": int64(90),
},
"from1": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(0),
"collected": int64(90),
"emitted": int64(90),
},
"window2": map[string]interface{}{
"emitted": int64(81),
"working_cardinality": int64(9),
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"collected": int64(90),
},
"max3": map[string]interface{}{
"emitted": int64(0),
"working_cardinality": int64(0),
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"collected": int64(81),
},
}

testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func TestStream_EvalCardinality(t *testing.T) {

var script = `
@@ -8897,6 +8946,53 @@ stream
testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func TestStream_GroupByCardinality(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
|window()
.period(1s)
.every(1s)
|groupBy('cpu')
`

// Expected Stats
es := map[string]map[string]interface{}{
"stream0": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(0),
"collected": int64(90),
"emitted": int64(90),
},
"from1": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(0),
"collected": int64(90),
"emitted": int64(90),
},
"window2": map[string]interface{}{
"emitted": int64(9),
"working_cardinality": int64(1),
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"collected": int64(90),
},
"groupby3": map[string]interface{}{
"emitted": int64(0),
"working_cardinality": int64(9),
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"collected": int64(9),
},
}

testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func TestStream_AlertCardinality(t *testing.T) {

var script = `
@@ -9076,11 +9172,6 @@ s2|join(s1)
}

func TestStream_CombineCardinality(t *testing.T) {
// TODO: skip for now. Needs special treatment
skip := true
if skip {
return
}

var script = `
var s1 = stream
@@ -9119,11 +9210,87 @@ var s1 = stream
testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func TestStream_MixedCardinality(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
.groupBy('host','cpu')
|where(lambda: "host" == 'localhost')
|eval(lambda: sigma("usage_user"))
.as('sigma')
|where(lambda: "cpu" == 'cpu-total' OR "cpu" == 'cpu0' OR "cpu" == 'cpu1')
|derivative('sigma')
|alert()
`

// Expected Stats
es := map[string]map[string]interface{}{
"stream0": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(0),
"collected": int64(90),
"emitted": int64(90),
},
"from1": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(0),
"collected": int64(90),
"emitted": int64(90),
},
"where2": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(9),
"collected": int64(90),
"emitted": int64(90),
},
"eval3": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(9),
"collected": int64(90),
"emitted": int64(90),
},
"where4": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(9),
"collected": int64(90),
"emitted": int64(30),
},
"derivative5": map[string]interface{}{
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"working_cardinality": int64(3),
"collected": int64(30),
"emitted": int64(27),
},
"alert6": map[string]interface{}{
"emitted": int64(0),
"working_cardinality": int64(3),
"avg_exec_time_ns": int64(0),
"errors": int64(0),
"collected": int64(27),
"warns_triggered": int64(0),
"crits_triggered": int64(0),
"alerts_triggered": int64(0),
"oks_triggered": int64(0),
"infos_triggered": int64(0),
},
}

testStreamerCardinality(t, "TestStream_Cardinality", script, es)
}

func testStreamerCardinality(t *testing.T, name, script string, expectedStats map[string]map[string]interface{}) {
clock, et, replayErr, tm := testStreamer(t, name, script, nil)
defer tm.Close()

err := fastForwardTask(clock, et, replayErr, tm, 15*time.Second)
err := fastForwardTask(clock, et, replayErr, tm, 20*time.Second)
if err != nil {
t.Fatalf("Encountered error: %v", err)
}