Skip to content

Commit

Permalink
feat(lambda): rand function for tick lambdas (#2680)
Browse files Browse the repository at this point in the history
* feat(lambda): rand function for tick lambdas
  • Loading branch information
docmerlin authored Jun 15, 2022
1 parent 0bfa4f3 commit b69fc92
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/uber/jaeger-client-go v2.28.0+incompatible
github.com/urfave/cli/v2 v2.3.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/zeebo/mwc v0.0.4
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.15.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,10 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/mwc v0.0.4 h1:9dNXNLtUB4lUXoXgyhy3YrKoV0OD7oRiu907YMS0nl0=
github.com/zeebo/mwc v0.0.4/go.mod h1:qNHfgp/ZCpQNcJHwKcO5EP3VgaBrW6DPohsK4QfyxxE=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
Expand Down
2 changes: 1 addition & 1 deletion integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func compareResults(exp, got models.Result) (bool, string) {
return false, fmt.Sprintf("unexpected series columns: i: %d \nexp %v \ngot %v", i, exp.Series[i].Columns, got.Series[i].Columns)
}
if !reflect.DeepEqual(exp.Series[i].Values, got.Series[i].Values) {
return false, fmt.Sprintf("unexpected series values: i: %d \nexp %v \ngot %v", i, exp.Series[i].Values, got.Series[i].Values)
return false, fmt.Sprintf("unexpected series values: i: %d \n %s", i, cmp.Diff(exp.Series[i].Values, got.Series[i].Values))
}
}
return true, ""
Expand Down
41 changes: 41 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ import (
"github.com/influxdata/kapacitor/services/victorops/victoropstest"
"github.com/influxdata/kapacitor/services/zenoss"
"github.com/influxdata/kapacitor/services/zenoss/zenosstest"
"github.com/influxdata/kapacitor/tick/stateful"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
udf_test "github.com/influxdata/kapacitor/udf/test"
"github.com/influxdata/wlog"
"github.com/k-sone/snmpgo"
"github.com/zeebo/mwc"
)

var diagService *diagnostic.Service
Expand Down Expand Up @@ -11644,6 +11646,45 @@ stream
testStreamerWithOutput(t, "TestStream_EvalNow", script, time.Second, expectedOutput, false, nil)
}

func TestStream_EvalRand(t *testing.T) {
var script = `
stream
|from()
.measurement('data')
|eval(lambda: rand(0), lambda: rand(20), lambda: rand(1), lambda: rand())
.as('rand0', 'rand20','rand1','rand')
|httpOut('TestStream_EvalRand')
`
// make the rng deterministic for the test
tempRand := stateful.NewRand
stateful.NewRand = func() *stateful.Rand {
r := mwc.T{}
r.Seed(time.Unix(0, 0).UTC().Unix())
return (*stateful.Rand)(&r)
}
defer func() { stateful.NewRand = tempRand }()
expectedOutput := models.Result{
Series: models.Rows{
{
Name: "data",
Tags: map[string]string{"owner": "ownerA"},
Columns: []string{"time", "rand", "rand0", "rand1", "rand20"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
float64(3131420824542496000), // this is due to a limitation of JSON, which httpOut uses
float64(0),
float64(0),
float64(3),
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_EvalRand", script, time.Second, expectedOutput, false, nil)
}

func TestStream_Autoscale(t *testing.T) {
testCases := map[string]struct {
script string
Expand Down
3 changes: 3 additions & 0 deletions integrations/testdata/TestStream_EvalRand.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dbname
rpname
data,owner=ownerA value=10i 0000000001
45 changes: 44 additions & 1 deletion tick/stateful/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
"strings"
"time"

"github.com/dustin/go-humanize"
humanize "github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/zeebo/mwc"
)

var ErrNotInt = errors.New("value is not an int")

// maxArgs is used to specify the largest number of arguments that a
// builtin function can accept.
// Increment this value if you create a builtin function with more than
Expand Down Expand Up @@ -248,6 +251,7 @@ func NewFunctions() Funcs {
funcs["sigma"] = &sigma{}
funcs["count"] = &count{}
funcs["spread"] = &spread{min: math.Inf(+1), max: math.Inf(-1)}
funcs["rand"] = NewRand()

return funcs
}
Expand Down Expand Up @@ -1085,6 +1089,45 @@ func (c *count) Signature() map[Domain]ast.ValueType {
return countFuncSignature
}

//NewRand creates a new rng. We do it this way so we can override it so tests are deterministic.
var NewRand = func() *Rand {
return (*Rand)(mwc.Rand())
}

// rand gives you a random number. It needs to be initalized before being run
type Rand mwc.T

func (c *Rand) Reset() {}

// returns a random number.
func (c *Rand) Call(args ...interface{}) (v interface{}, err error) {
if len(args) > 1 {
return 0, errors.New("rand expects zero or one argument")
}
if len(args) == 1 {
n, ok := args[0].(int64)
if !ok {
fmt.Println(reflect.TypeOf(args[0]).Name())
return nil, ErrNotInt
}
return int64(((*mwc.T)(c).Uint64n(uint64(n)) & ((1 << 63) - 1))), nil
}
return int64(((*mwc.T)(c).Uint64() & ((1 << 63) - 1))), nil
}

var randFuncSignature = map[Domain]ast.ValueType{}

// Initialize Count Function Signature
func init() {
d := Domain{}
randFuncSignature[d] = ast.TInt
randFuncSignature[Domain{ast.TInt}] = ast.TInt
}

func (c *Rand) Signature() map[Domain]ast.ValueType {
return randFuncSignature
}

type sigma struct {
mean float64
variance float64
Expand Down
45 changes: 45 additions & 0 deletions tick/stateful/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"regexp"
"testing"
"time"

"github.com/zeebo/mwc"
)

func Test_Bool(t *testing.T) {
Expand Down Expand Up @@ -913,3 +915,46 @@ func Test_StatelessFuncs(t *testing.T) {
}

}

func Test_Rand_zeros(t *testing.T) {
f := NewRand()
// seed with a known value to force determinism.
(*mwc.T)(f).Seed(time.Unix(0, 0).Unix())
testCases := []struct {
args []interface{}
exp int64
err error
}{
{args: []interface{}{int64(0)}, exp: 0},
{args: []interface{}{int64(1)}, exp: 0},
{args: []interface{}{int64(10)}, exp: 3},
{args: []interface{}{int64(10)}, exp: 2},
{args: []interface{}{int64(10)}, exp: 7},
{args: []interface{}{int64(10)}, exp: 9},
{args: []interface{}{int64(10)}, exp: 3},
{args: []interface{}{int64(10)}, exp: 8},
{args: []interface{}{int64(10)}, exp: 7},
{args: []interface{}{}, exp: 7383185112808722380},
}
for i, tc := range testCases {
result, err := f.Call(tc.args...)
if tc.err != nil {
if err == nil {
t.Errorf("expected error from rand(%v) got nil exp %s", tc.args, tc.err)
} else if got, exp := err.Error(), tc.err.Error(); got != exp {
t.Errorf("unexpected error from rand(%v) got %s exp %s", tc.args, got, exp)
}
continue
} else if err != nil {
t.Errorf("unexpected error from rand(%v) %s", tc.args, err)
continue
}
res, ok := result.(int64)
if !ok {
t.Errorf("expected int64 result from rand(%v) %s", tc.args, err)
}
if res != tc.exp {
t.Errorf("expected %v as a result of the %vth run of rand (when given unix zero date as set seed) but got %v", tc.exp, i, res)
}
}
}

0 comments on commit b69fc92

Please sign in to comment.