Skip to content

Commit

Permalink
feat: run flux tasks with built-in flux engine (#2555)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesam authored May 25, 2021
1 parent 6ceb6ec commit 5c162cd
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise.
- [#2493](https://github.com/influxdata/kapacitor/pull/2493): Route kafka alerts to partitions by ID, and allow for configuring the hashing strategy.
- [#2544](https://github.com/influxdata/kapacitor/pull/2544): flux tasks skeleton in Kapacitor
- [#2555](https://github.com/influxdata/kapacitor/pull/2555): run flux tasks with built-in flux engine

## v1.5.9 [2021-04-01]

Expand Down
4 changes: 4 additions & 0 deletions cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"

"github.com/BurntSushi/toml"
"github.com/influxdata/flux/fluxinit"
"github.com/influxdata/kapacitor/server"
"github.com/influxdata/kapacitor/services/diagnostic"
)
Expand Down Expand Up @@ -122,6 +123,9 @@ func (cmd *Command) Run(args ...string) error {
}
cmd.pidfile = options.PIDFile

// run flux initialization
fluxinit.FluxInit()

// Create server from config and start it.
buildInfo := server.BuildInfo{Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch, Platform: cmd.Platform}
s, err := server.New(config, buildInfo, cmd.diagService)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe
github.com/influxdata/flux v0.114.1
github.com/influxdata/flux v0.116.1-0.20210519190248-4347b978c91a
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxdb v1.8.4
github.com/influxdata/influxdb/v2 v2.0.1-alpha.10.0.20210507184756-dc72dc3f0c07
Expand All @@ -61,7 +61,7 @@ require (
github.com/shurcooL/go v0.0.0-20170331015642-20b4b0a35211 // indirect
github.com/shurcooL/markdownfmt v0.0.0-20170214213350-10aae0a270ab
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-client-go v2.28.0+incompatible
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.14.1
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
Expand Down
52 changes: 52 additions & 0 deletions go.sum

Large diffs are not rendered by default.

103 changes: 103 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"net/mail"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/dgrijalva/jwt-go"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux/fluxinit"
iclient "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/influxql"
imodels "github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -80,6 +82,8 @@ import (
"github.com/influxdata/kapacitor/services/zenoss/zenosstest"
"github.com/k-sone/snmpgo"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/bcrypt"
)

Expand All @@ -88,6 +92,7 @@ var udfDir string
func init() {
dir, _ := os.Getwd()
udfDir = filepath.Clean(filepath.Join(dir, "../udf"))
fluxinit.FluxInit()
}

func mustHash(hash []byte, err error) string {
Expand Down Expand Up @@ -13335,6 +13340,104 @@ func TestLogSessions_HeaderGzip(t *testing.T) {
t.Fatalf("expected: %v, got: %v\n", exp, got)
return
}
}

func TestFluxTasks_Basic(t *testing.T) {
conf := NewConfig()
conf.FluxTask.Enabled = true
conf.FluxTask.TaskRunInfluxDB = "none"
s := OpenServer(conf)
cli := Client(s)
defer s.Close()

// Check we can query empty tasks
u := cli.BaseURL()
basePath := "kapacitor/v1/api/v2/tasks"
query := func(method string, path string, body string) string {
u.Path = path
t.Log("Querying: ", method, u.String())
req, err := http.NewRequest(method, u.String(), bytes.NewBufferString(body))
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
buf := bytes.Buffer{}
_, err = io.Copy(&buf, resp.Body)
require.NoError(t, err)
return strings.Trim(buf.String(), "\n")
}

assert.Equal(t, `{"links":{"self":"/kapacitor/v1/api/v2/tasks?limit=100"},"tasks":[]}`, query("GET", basePath, ""))

// Start a simple server. It listens on port and closes requestDone when finished.
// This lets us create a task that uses the flux-native http.Post and assert that
// flux is configured properly
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
server := &http.Server{}
requestStarted := make(chan struct{})
requestStopped := make(chan struct{})
go func() {
server.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-requestStarted
<-requestStopped
})
server.Serve(listener)
}()
defer server.Close()

// create a task, assert on the important parts of it
fluxScript := fmt.Sprintf(`import "http"
option task = {concurrency: 1, name:"poster", every:1s}
http.post(url: "http://localhost:%d")
`, port)
task := fmt.Sprintf(`{"status": "active", "description": "simple post", "flux": %q}`, fluxScript)
createResp := make(map[string]interface{})
require.NoError(t, json.Unmarshal([]byte(query("POST", basePath, task)), &createResp))
id := createResp["id"].(string)
assert.Equal(t, 16, len(id))
assert.Contains(t, createResp, "orgID")
assert.Equal(t, "", createResp["orgID"])
assert.Equal(t, "poster", createResp["name"])
logPath := fmt.Sprintf("/kapacitor/v1/api/v2/tasks/%s/logs", id)
runPath := fmt.Sprintf("/kapacitor/v1/api/v2/tasks/%s/runs", id)
selfPath := fmt.Sprintf("/kapacitor/v1/api/v2/tasks/%s", id)
assert.Equal(t, map[string]interface{}{
"logs": logPath,
"runs": runPath,
"self": selfPath,
}, createResp["links"])

t.Log("waiting for request")
requestStarted <- struct{}{}

// Request is now started (it hit our test server with a post) but can't finish - time to assert that we have runs and logs
logResp := make(map[string]interface{})
require.NoError(t, json.Unmarshal([]byte(query("GET", logPath, task)), &logResp))
expectMessage := "Started task from script:"
assert.Equal(t, expectMessage, logResp["events"].([]interface{})[0].(map[string]interface{})["message"].(string)[:len(expectMessage)])

runResp := make(map[string]interface{})
require.NoError(t, json.Unmarshal([]byte(query("GET", runPath, task)), &runResp))
assert.Equal(t, map[string]interface{}{
"task": selfPath,
"self": runPath,
}, runResp["links"])

// stop blocking the server
close(requestStarted)
close(requestStopped)

// Assert that we can really update a task
query("PATCH", selfPath, `{"every": "10m"}`)
getResp := make(map[string]interface{})
require.NoError(t, json.Unmarshal([]byte(query("GET", selfPath, task)), &getResp))
assert.Equal(t, getResp["every"], "10m")

// Assert that when we delete a task it goes away
query("DELETE", selfPath, "")
assert.Equal(t, `{"links":{"self":"/kapacitor/v1/api/v2/tasks?limit=100"},"tasks":[]}`, query("GET", basePath, ""))

}

Expand Down
85 changes: 29 additions & 56 deletions services/fluxtask/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package fluxtask

import (
"context"
"fmt"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/influxdb"
Expand All @@ -16,6 +14,7 @@ import (
"github.com/influxdata/kapacitor/task/backend/executor"
"github.com/influxdata/kapacitor/task/backend/middleware"
"github.com/influxdata/kapacitor/task/backend/scheduler"
"github.com/influxdata/kapacitor/task/fluxlocal"
"github.com/influxdata/kapacitor/task/http"
"github.com/influxdata/kapacitor/task/kv"
"github.com/influxdata/kapacitor/task/taskmodel"
Expand Down Expand Up @@ -50,24 +49,35 @@ func (s *Service) Open() error {
OrgID: s.config.TaskRunOrgID,
Measurement: s.config.TaskRunMeasurement,
}
cli, err := s.InfluxDBService.NewNamedClient(s.config.TaskRunInfluxDB)
if err != nil {
return err
}
combinedTaskService := backend.NewAnalyticalStorage(
s.logger.With(zap.String("service", "fluxtask-analytical-store")),
s.kvService,
s.kvService,
cli,
dataDestination,

var (
taskService taskmodel.TaskService = s.kvService
taskControlService backend.TaskControlService = s.kvService
)

if s.config.TaskRunInfluxDB != "none" {
cli, err := s.InfluxDBService.NewNamedClient(s.config.TaskRunInfluxDB)
if err != nil {
return err
}
combinedTaskService := backend.NewAnalyticalStorage(
s.logger.With(zap.String("service", "fluxtask-analytical-store")),
taskService,
taskControlService,
cli,
dataDestination,
)
taskService = combinedTaskService
taskControlService = combinedTaskService
}
// TODO: register metrics returned here?
executor, _ := executor.NewExecutor(
s.logger.With(zap.String("service", "fluxtask-executor")),
&DummyQueryService{logger: s.logger.With(zap.String("service", "fluxtask-dummy-query-executor"))},
combinedTaskService,
combinedTaskService,
fluxlocal.NewFluxQueryer(s.config.Secrets, s.logger.With(zap.String("service", "flux-local-querier"))),
taskService,
taskControlService,
)
var err error
schLogger := s.logger.With(zap.String("service", "fluxtask-scheduler"))
//TODO: register metrics returned here?
s.scheduler, _, err = scheduler.NewScheduler(
Expand All @@ -89,11 +99,11 @@ func (s *Service) Open() error {
coordLogger,
s.scheduler,
executor)
fluxTaskService := middleware.New(combinedTaskService, taskCoord)
taskService = middleware.New(taskService, taskCoord)
if err := backend.TaskNotifyCoordinatorOfExisting(
context.Background(),
fluxTaskService,
combinedTaskService,
taskService,
taskControlService,
taskCoord,
func(ctx context.Context, taskID platform.ID, runID platform.ID) error {
_, err := executor.ResumeCurrentRun(ctx, taskID, runID)
Expand All @@ -102,7 +112,7 @@ func (s *Service) Open() error {
coordLogger); err != nil {
s.logger.Error("Failed to resume existing flux tasks", zap.Error(err))
}
if err := http.AddTaskServiceRoutes(s.HTTPDService.Handler, s.logger, fluxTaskService); err != nil {
if err := http.AddTaskServiceRoutes(s.HTTPDService.Handler, s.logger, taskService); err != nil {
s.logger.Fatal("Could not add task service routes", zap.Error(err))
}
}
Expand All @@ -125,40 +135,3 @@ func (s *Service) Close() error {
}
return nil
}

// TODO: remove
type NoResultIterator struct{}

func (n NoResultIterator) More() bool {
return false
}

func (n NoResultIterator) Next() flux.Result {
panic("should not be called since More is always false")
}

func (n NoResultIterator) Release() {
}

func (n NoResultIterator) Err() error {
return nil
}

func (n NoResultIterator) Statistics() flux.Statistics {
return flux.Statistics{}
}

var _ flux.ResultIterator = &NoResultIterator{}

type DummyQueryService struct {
logger *zap.Logger
}

func (d *DummyQueryService) Query(ctx context.Context, compiler flux.Compiler) (flux.ResultIterator, error) {
d.logger.Info(fmt.Sprintf("DummyQueryService called with compiler of type %T", compiler))
time.Sleep(30 * time.Second)
d.logger.Info("DummyQueryService is done sleeping")
return new(NoResultIterator), nil
}

var _ taskmodel.QueryService = &DummyQueryService{}
8 changes: 8 additions & 0 deletions task/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Config struct {
// TaskRunInfluxDB is the name of the influxdb instance finished
// task runs and logs are written to.
// Leaving it blank will write to Kapacitor's default influxdb instance.
// Setting it to 'none' will disable task logging.
TaskRunInfluxDB string `toml:"task-run-influxdb"`

// TaskRunBucket is the bucket (or influxdb 1.x database) to use for saving
Expand All @@ -31,6 +32,10 @@ type Config struct {
// and logs.
// The defaults is "runs"
TaskRunMeasurement string `toml:"task-run-measurement"`

// Secrets is the kapacitor provider for secrets as described at
// https://docs.influxdata.com/influxdb/v2.0/security/secrets/
Secrets map[string]string `toml:"secrets"`
}

func NewConfig() Config {
Expand All @@ -43,6 +48,9 @@ func (c Config) Validate() error {
if !c.Enabled {
return nil
}
if c.TaskRunInfluxDB == "none" {
return nil
}
if len(c.TaskRunOrgID) > 0 && len(c.TaskRunOrg) > 0 {
return fmt.Errorf("only one of task-run-org and task-run-orgid should be set")
}
Expand Down
Loading

0 comments on commit 5c162cd

Please sign in to comment.