Skip to content

Commit

Permalink
feat: evaluate and store "now" in execution deps for tableFind (take …
Browse files Browse the repository at this point in the history
…2) (#2811)

Added "now" to the execution dependencies. These are the dependencies that are
necessary to start an execution pipeline from the evaluation step, as is done
by tableFind. When the interpreter encounters the now option (a function) it is
evaluated and the result is stored in the execution dependencies, making it
available for tableFind to use.

We also inject some default execution dependencies for the repl to use, making
tableFind and friends work properly in the repl.
  • Loading branch information
adrian-thurston authored Jun 18, 2020
1 parent bb1cab9 commit 63ca1cc
Show file tree
Hide file tree
Showing 15 changed files with 1,804 additions and 47 deletions.
3 changes: 0 additions & 3 deletions compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ const (
tableKindKey = "kind"
tableParentsKey = "parents"
tableSpecKey = "spec"

NowOption = "now"
nowPkg = "universe"
)

type CreateOperationSpec func(args Arguments, a *Administration) (OperationSpec, error)
Expand Down
8 changes: 6 additions & 2 deletions internal/spec/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/influxdata/flux"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/lang/execdeps"
"github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -135,6 +136,9 @@ func FromScript(ctx context.Context, runtime flux.Runtime, now time.Time, script
}
s.Finish()

deps := execdeps.NewExecutionDependencies(nil, &now, nil)
ctx = deps.Inject(ctx)

s, cctx := opentracing.StartSpanFromContext(ctx, "eval")
sideEffects, scope, err := runtime.Eval(cctx, astPkg, flux.SetNowOption(now))
if err != nil {
Expand All @@ -144,9 +148,9 @@ func FromScript(ctx context.Context, runtime flux.Runtime, now time.Time, script

s, cctx = opentracing.StartSpanFromContext(ctx, "compile")
defer s.Finish()
nowOpt, ok := scope.Lookup(flux.NowOption)
nowOpt, ok := scope.Lookup(interpreter.NowOption)
if !ok {
return nil, fmt.Errorf("%q option not set", flux.NowOption)
return nil, fmt.Errorf("%q option not set", interpreter.NowOption)
}
nowTime, err := nowOpt.Function().Call(ctx, nil)
if err != nil {
Expand Down
37 changes: 36 additions & 1 deletion interpreter/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ import (
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/lang/execdeps"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)

const PackageMain = "main"
const (
PackageMain = "main"
NowPkg = "universe"
NowOption = "now"
)

type Interpreter struct {
sideEffects []SideEffect // a list of the side effects occurred during the last call to `Eval`.
Expand Down Expand Up @@ -157,6 +162,31 @@ func (itrp *Interpreter) doStatement(ctx context.Context, stmt semantic.Statemen
return nil, nil
}

// If the option is "now", evaluate the function and store in the execution
// dependencies.
func (irtp *Interpreter) evaluateNowOption(ctx context.Context, name string, init values.Value) {
if name != NowOption {
return
}
if !execdeps.HaveExecutionDependencies(ctx) {
return
}

// Evaluate now.
nowTime, err := init.Function().Call(ctx, nil)
if err != nil {
return
}
now := nowTime.Time().Time()

// Stash in the execution dependencies. The deps use a pointer and we
// overwrite the dest of the pointer. Overwritng the pointer would have no
// effect as context changes are passed down only.
deps := execdeps.GetExecutionDependencies(ctx)
*deps.Now = now
deps.Inject(ctx)
}

func (itrp *Interpreter) doOptionStatement(ctx context.Context, s *semantic.OptionStatement, scope values.Scope) (values.Value, error) {
switch a := s.Assignment.(type) {
case *semantic.NativeVariableAssignment:
Expand All @@ -165,6 +195,11 @@ func (itrp *Interpreter) doOptionStatement(ctx context.Context, s *semantic.Opti
return nil, err
}

// Some functions require access to now from the execution dependencies
// (eg tableFind). For those cases we immediately evaluate and store it
// in the execution deps.
itrp.evaluateNowOption(ctx, a.Identifier.Name, init)

// Retrieve an option with the name from the scope.
// If it exists and is an option, then set the option
// as it is from the prelude.
Expand Down
17 changes: 9 additions & 8 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/spec"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/lang/execdeps"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
Expand Down Expand Up @@ -357,13 +359,12 @@ func (p *AstProgram) getSpec(ctx context.Context, runtime flux.Runtime, alloc *m
p.Ast = extern
p.opts.extern = nil
}
// The program must inject execution dependencies to make it available
// to function calls during the evaluation phase (see `tableFind`).
deps := ExecutionDependencies{
Allocator: alloc,
Logger: p.Logger,
}

// The program must inject execution dependencies to make it available to
// function calls during the evaluation phase (see `tableFind`).
deps := execdeps.NewExecutionDependencies(alloc, &p.Now, p.Logger)
ctx = deps.Inject(ctx)

s, cctx := opentracing.StartSpanFromContext(ctx, "eval")
sideEffects, scope, err := runtime.Eval(cctx, p.Ast, flux.SetNowOption(p.Now))
if err != nil {
Expand All @@ -373,9 +374,9 @@ func (p *AstProgram) getSpec(ctx context.Context, runtime flux.Runtime, alloc *m

s, cctx = opentracing.StartSpanFromContext(ctx, "compile")
defer s.Finish()
nowOpt, ok := scope.Lookup(flux.NowOption)
nowOpt, ok := scope.Lookup(interpreter.NowOption)
if !ok {
return nil, nil, fmt.Errorf("%q option not set", flux.NowOption)
return nil, nil, fmt.Errorf("%q option not set", interpreter.NowOption)
}
nowTime, err := nowOpt.Function().Call(ctx, nil)
if err != nil {
Expand Down
31 changes: 29 additions & 2 deletions lang/dependencies.go → lang/execdeps/dependencies.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package lang
package execdeps

import (
"context"
"time"

"github.com/influxdata/flux/memory"
"go.uber.org/zap"
Expand All @@ -14,8 +15,12 @@ const executionDependenciesKey key = iota
// ExecutionDependencies represents the dependencies that a function call
// executed by the Interpreter needs in order to trigger the execution of a flux.Program
type ExecutionDependencies struct {
// Must be set
Allocator *memory.Allocator
Logger *zap.Logger
Now *time.Time

// Allowed to be nil
Logger *zap.Logger
}

func (d ExecutionDependencies) Inject(ctx context.Context) context.Context {
Expand All @@ -29,3 +34,25 @@ func HaveExecutionDependencies(ctx context.Context) bool {
func GetExecutionDependencies(ctx context.Context) ExecutionDependencies {
return ctx.Value(executionDependenciesKey).(ExecutionDependencies)
}

// Create some execution dependencies. Any arg may be nil, this will choose
// some suitable defaults.
func NewExecutionDependencies(allocator *memory.Allocator, now *time.Time, logger *zap.Logger) ExecutionDependencies {
if allocator == nil {
allocator = new(memory.Allocator)
}

if now == nil {
nowVar := time.Now()
now = &nowVar
}
return ExecutionDependencies{
Allocator: allocator,
Now: now,
Logger: logger,
}
}

func DefaultExecutionDependencies() ExecutionDependencies {
return NewExecutionDependencies(nil, nil, nil)
}
12 changes: 0 additions & 12 deletions lang/langtest/dependencies.go

This file was deleted.

1 change: 1 addition & 0 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/string_max_test.flux": "78c162ba2901856e0091f7990018b4d6e31b9fd34f7152002fcca1c2c155abf7",
"stdlib/universe/string_sort_test.flux": "3ee25884c1b5b02dd36d153929714dedfced7176d7e2f54ebcdfb5797bd8bbde",
"stdlib/universe/sum_test.flux": "dbc5ae649610f6ffdbe8f9ea3f2b31fc1a2bf1e95bad579a07f39a838c8c5980",
"stdlib/universe/table_fns_test.flux": "9b7c90f0e8df57ceda8a36c08f24d28d18507c9bd99016b27537f978012d49d2",
"stdlib/universe/tail_offset_test.flux": "37e4ac52a8d6c7bb300e5ddd4901486fc466a9ac10579ae3fe465b2d1cafdeb9",
"stdlib/universe/tail_test.flux": "42b7af0693a4a51ba59c4d4e35444a55526006098ae546b998c594b39f87c768",
"stdlib/universe/task_per_line_test.flux": "b2b1342712b508e8978ba51270c202aa81a6667356e30da2085a3bc546cbc82a",
Expand Down
5 changes: 5 additions & 0 deletions repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/spec"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/lang/execdeps"
"github.com/influxdata/flux/libflux/go/libflux"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
Expand Down Expand Up @@ -158,6 +159,10 @@ func (r *REPL) Eval(t string) ([]interpreter.SideEffect, error) {
if err != nil {
return nil, err
}

deps := execdeps.DefaultExecutionDependencies()
r.ctx = deps.Inject(r.ctx)

return r.itrp.Eval(r.ctx, pkg, r.scope, r.importer)
}

Expand Down
4 changes: 2 additions & 2 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func SetOption(pkg, name string, fn func(r Runtime) values.Value) ScopeMutator {

// SetNowOption returns a ScopeMutator that sets the `now` option to the given time.
func SetNowOption(now time.Time) ScopeMutator {
return SetOption(nowPkg, NowOption, generateNowFunc(now))
return SetOption(interpreter.NowPkg, interpreter.NowOption, generateNowFunc(now))
}

func generateNowFunc(now time.Time) func(r Runtime) values.Value {
Expand All @@ -82,6 +82,6 @@ func generateNowFunc(now time.Time) func(r Runtime) values.Value {
call := func(ctx context.Context, args values.Object) (values.Value, error) {
return timeVal, nil
}
return values.NewFunction(NowOption, ftype, call, false)
return values.NewFunction(interpreter.NowOption, ftype, call, false)
}
}
5 changes: 3 additions & 2 deletions stdlib/experimental/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/lang/execdeps"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/values"
)
Expand Down Expand Up @@ -43,10 +44,10 @@ func chainCall(ctx context.Context, args values.Object) (values.Value, error) {
return nil, errors.Wrap(err, codes.Inherit, "error in table object compilation")
}

if !lang.HaveExecutionDependencies(ctx) {
if !execdeps.HaveExecutionDependencies(ctx) {
return nil, errors.New(codes.Internal, "no execution context for chain to use")
}
deps := lang.GetExecutionDependencies(ctx)
deps := execdeps.GetExecutionDependencies(ctx)

if program, ok := program.(lang.LoggingProgram); ok {
program.SetLogger(deps.Logger)
Expand Down
4 changes: 2 additions & 2 deletions stdlib/experimental/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/influxdata/flux/dependencies/dependenciestest"
"github.com/influxdata/flux/lang/langtest"
"github.com/influxdata/flux/lang/execdeps"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/stdlib/experimental"
"github.com/influxdata/flux/values"
Expand Down Expand Up @@ -60,7 +60,7 @@ func makeArgs(first values.Value, second values.Value) values.Object {

func TestChain(t *testing.T) {
context := dependenciestest.Default().Inject(context.Background())
context = langtest.DefaultExecutionDependencies().Inject(context)
context = execdeps.DefaultExecutionDependencies().Inject(context)
_, scope, err := runtime.Eval(context, table1)
if err != nil {
t.Fatalf("unexpected error: %s", err)
Expand Down
Loading

0 comments on commit 63ca1cc

Please sign in to comment.