From be11bac52175df9369e7443845bea3e607ae0024 Mon Sep 17 00:00:00 2001 From: koolay Date: Wed, 19 Aug 2020 03:21:09 +0800 Subject: [PATCH] cqrs --- .gitignore | 1 + .vscode/launch.json | 23 ++++++ cmd/agent.go | 63 ++++++++++----- cmd/worker.go | 26 +++--- go.mod | 7 +- go.sum | 65 --------------- pkg/agent/command.go | 34 ++++++++ pkg/agent/publish.go | 31 -------- pkg/agent/streaming_server.go | 65 --------------- pkg/lineage/event.go | 30 +++++++ pkg/message/cqrs.go | 146 ++++++++++++++++++++++++++++++++++ pkg/message/pubsub.go | 24 ------ pkg/proto/sqlcmd.go | 39 +++++++++ pkg/proto/sqlcmd.proto | 4 + pkg/store/event.go | 31 ++++++++ pkg/store/loki.go | 14 ++++ pkg/store/soar.go | 17 ++++ pkg/worker/serve.go | 51 ------------ 18 files changed, 394 insertions(+), 277 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 pkg/agent/command.go delete mode 100644 pkg/agent/publish.go delete mode 100644 pkg/agent/streaming_server.go create mode 100644 pkg/lineage/event.go create mode 100644 pkg/message/cqrs.go delete mode 100644 pkg/message/pubsub.go create mode 100644 pkg/proto/sqlcmd.go create mode 100644 pkg/proto/sqlcmd.proto create mode 100644 pkg/store/event.go create mode 100644 pkg/store/loki.go create mode 100644 pkg/store/soar.go delete mode 100644 pkg/worker/serve.go diff --git a/.gitignore b/.gitignore index 6b42c6c..598598f 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ app.config .DS_Store *.pprof vendor/ +__debug_bin diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..858680d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "args": [ + "agent", + ], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/cmd/agent.go b/cmd/agent.go index 73a4387..6a0b464 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -1,18 +1,17 @@ package cmd import ( - "context" - "fmt" "log" "os" "os/signal" "time" + "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/koolay/sqlboss/pkg/agent" - "github.com/koolay/sqlboss/pkg/conf" + "github.com/koolay/sqlboss/pkg/lineage" "github.com/koolay/sqlboss/pkg/message" - "github.com/koolay/sqlboss/pkg/worker" - "github.com/sirupsen/logrus" + "github.com/koolay/sqlboss/pkg/proto" + "github.com/koolay/sqlboss/pkg/store" cli "gopkg.in/urfave/cli.v2" ) @@ -27,6 +26,11 @@ func newAgentCmd() *cli.Command { Value: defaultConfigFileFolder, Usage: "path of config file", }, + &cli.StringFlag{ + Name: "log-level", + Value: "INFO", + Usage: "log level", + }, }, Action: serveAction, } @@ -45,20 +49,48 @@ func serveAction(c *cli.Context) error { log.Println(cfg) log.Println("start worker") + ctx := c.Context - go startWorker(context.Background(), cfg, logger) + cqrsMarshaler := cqrs.JSONMarshaler{} + cqrsServer, err := message.NewCQRSServer(logger.WithContext(ctx), cqrsMarshaler) + if err != nil { + return err + } + + if err = cqrsServer.Setup([]message.CommandHandlerGenerator{ + func(cb *cqrs.CommandBus, eb *cqrs.EventBus) cqrs.CommandHandler { + return agent.NewSQLCommandHandler(eb) + }, + }, + + []message.EventHandlerGenerator{ + func(cb *cqrs.CommandBus, eb *cqrs.EventBus) cqrs.EventHandler { + return store.StoreOnSQLEventHandler{} + }, + func(cb *cqrs.CommandBus, eb *cqrs.EventBus) cqrs.EventHandler { + return lineage.LineageOnSQLEventHandler{} + }, + }); err != nil { + return err + } + + commandBus := cqrsServer.GetCommandBus() go func() { - pub, err := agent.NewPublisher(cfg, message.NewPubSub()) - if err != nil { - log.Fatal(err) + if serr := cqrsServer.Start(); serr != nil { + logger.Fatal(serr) } + }() + go func() { count := 0 + time.Sleep(1 * time.Second) for { log.Println("publish message") count++ - if perr := pub.Publish(cfg.Stream.Topic, []byte(fmt.Sprintf("hello, %d", count))); perr != nil { + + data := &proto.SqlCommand{} + if perr := commandBus.Send(ctx, data); perr != nil { logger.WithError(perr).Error("failed to pushlish") } @@ -73,14 +105,3 @@ func serveAction(c *cli.Context) error { log.Println("Shutdown Service ...") return nil } - -func startWorker(ctx context.Context, cfg *conf.Config, logger *logrus.Logger) { - wk := worker.NewWorker(cfg, message.NewPubSub(), logger) - if err := wk.Setup(); err != nil { - log.Fatal(err) - } - - if err := wk.Run(ctx); err != nil { - log.Fatal(err) - } -} diff --git a/cmd/worker.go b/cmd/worker.go index d696f54..16748b1 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -5,8 +5,6 @@ import ( "os" "os/signal" - "github.com/koolay/sqlboss/pkg/message" - "github.com/koolay/sqlboss/pkg/worker" cli "gopkg.in/urfave/cli.v2" ) @@ -32,20 +30,20 @@ func newWorkerCmd() *cli.Command { } func handleWorkerCmd(c *cli.Context) error { - cfg, err := loadConfig(c) - if err != nil { - return err - } + // cfg, err := loadConfig(c) + // if err != nil { + // return err + // } - logger := newLogger(c.String("log-level")) - wk := worker.NewWorker(cfg, message.NewPubSub(), logger) - if err := wk.Setup(); err != nil { - return err - } + // logger := newLogger(c.String("log-level")) + // wk := worker.NewWorker(cfg, message.NewPubSub(), logger) + // if err := wk.Setup(); err != nil { + // return err + // } - if err := wk.Run(c.Context); err != nil { - return err - } + // if err := wk.Run(c.Context); err != nil { + // return err + // } quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) diff --git a/go.mod b/go.mod index cfbc39c..2246386 100644 --- a/go.mod +++ b/go.mod @@ -7,15 +7,11 @@ require ( github.com/bwmarrin/snowflake v0.3.0 github.com/go-sql-driver/mysql v1.5.0 github.com/gofrs/uuid v3.3.0+incompatible + github.com/gogo/protobuf v1.3.1 // indirect github.com/google/go-cmp v0.5.0 github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect - github.com/hashicorp/golang-lru v0.5.1 // indirect github.com/imdario/mergo v0.3.11 github.com/kr/pretty v0.2.0 // indirect - github.com/nats-io/jwt v1.0.1 // indirect - github.com/nats-io/nats-server/v2 v2.1.7 - github.com/nats-io/nats-streaming-server v0.18.0 - github.com/nats-io/nkeys v0.2.0 // indirect github.com/oklog/ulid v1.3.1 github.com/pingcap/errors v0.11.4 github.com/pkg/errors v0.9.1 @@ -24,7 +20,6 @@ require ( github.com/smartystreets/assertions v0.0.0-20190116191733-b6c0e53d7304 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect github.com/stretchr/testify v1.4.0 // indirect - golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed // indirect google.golang.org/protobuf v1.25.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index 80c0ce6..7cb6d49 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,16 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/ThreeDotsLabs/watermill v1.1.1 h1:+9NXqWQvplzxBru2CIInvVOZeKUnM+Nysg42fInl5sY= github.com/ThreeDotsLabs/watermill v1.1.1/go.mod h1:Qd1xNFxolCAHCzcMrm6RnjW0manbvN+DJVWc1MWRFlI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= -github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= -github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -64,24 +58,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e h1:JKmoR8x90Iww1 github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= -github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= -github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= -github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= -github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= -github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= -github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= -github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= -github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= -github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -100,37 +78,14 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY= -github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lithammer/shortuuid/v3 v3.0.4 h1:uj4xhotfY92Y1Oa6n6HUiFn87CdoEHYUlTy0+IgbLrs= github.com/lithammer/shortuuid/v3 v3.0.4/go.mod h1:RviRjexKqIzx/7r1peoAITm6m7gnif/h+0zmolKJjzw= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= -github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/jwt v1.0.1 h1:71ivoESdfT2K/qDiw5YwX/3W9/dR7c+m83xiGOj/EZ4= -github.com/nats-io/jwt v1.0.1/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= -github.com/nats-io/nats-server/v2 v2.1.7 h1:jCoQwDvRYJy3OpOTHeYfvIPLP46BMeDmH7XEJg/r42I= -github.com/nats-io/nats-server/v2 v2.1.7/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE= -github.com/nats-io/nats-streaming-server v0.18.0 h1:+RDozeN9scwCm0Wc2fYlvGcP144hvxvSOtxZ8FE21ME= -github.com/nats-io/nats-streaming-server v0.18.0/go.mod h1:Y9Aiif2oANuoKazQrs4wXtF3jqt6p97ODQg68lR5TnY= -github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= -github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= -github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= -github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nats-io/stan.go v0.7.0 h1:sMVHD9RkxPOl6PJfDVBQd+gbxWkApeYl6GrH+10msO4= -github.com/nats-io/stan.go v0.7.0/go.mod h1:Ci6mUIpGQTjl++MqK2XzkWI/0vF+Bl72uScx7ejSYmU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= -github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= @@ -145,18 +100,13 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= -github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= @@ -183,16 +133,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= -go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= -golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -200,26 +142,19 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed h1:J22ig1FUekjjkmZUM7pTKixYm8DvrYsvrBZdunYeIuQ= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/agent/command.go b/pkg/agent/command.go new file mode 100644 index 0000000..ffd9e58 --- /dev/null +++ b/pkg/agent/command.go @@ -0,0 +1,34 @@ +package agent + +import ( + "context" + "fmt" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/koolay/sqlboss/pkg/proto" +) + +// SQLCommandHandler 处理sql命令事件 +type SQLCommandHandler struct { + eventBus *cqrs.EventBus + // sql query -> send event +} + +func NewSQLCommandHandler(eventBus *cqrs.EventBus) SQLCommandHandler { + return SQLCommandHandler{eventBus: eventBus} +} + +func (b SQLCommandHandler) HandlerName() string { + return "SQLCommandHandler" +} + +// NewCommand returns type of command which this handle should handle. It must be a pointer. +func (b SQLCommandHandler) NewCommand() interface{} { + return &proto.SqlCommand{} +} + +func (b SQLCommandHandler) Handle(ctx context.Context, c interface{}) error { + cmd := c.(*proto.SqlCommand) + fmt.Println("received sql command", cmd) + return b.eventBus.Publish(ctx, cmd) +} diff --git a/pkg/agent/publish.go b/pkg/agent/publish.go deleted file mode 100644 index b75297c..0000000 --- a/pkg/agent/publish.go +++ /dev/null @@ -1,31 +0,0 @@ -package agent - -import ( - "github.com/ThreeDotsLabs/watermill" - "github.com/pkg/errors" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" - "github.com/koolay/sqlboss/pkg/conf" -) - -type Publisher struct { - cfg *conf.Config - pubSub *gochannel.GoChannel -} - -func NewPublisher(cfg *conf.Config, pubSub *gochannel.GoChannel) (*Publisher, error) { - return &Publisher{ - pubSub: pubSub, - cfg: cfg, - }, nil -} - -func (p *Publisher) Publish(topic string, data []byte) error { - msg := message.NewMessage(watermill.NewUUID(), data) - if err := p.pubSub.Publish(topic, msg); err != nil { - return errors.Wrap(err, "failed to publish message") - } - - return nil -} diff --git a/pkg/agent/streaming_server.go b/pkg/agent/streaming_server.go deleted file mode 100644 index e487327..0000000 --- a/pkg/agent/streaming_server.go +++ /dev/null @@ -1,65 +0,0 @@ -package agent - -import ( - "fmt" - "time" - - natsd "github.com/nats-io/nats-server/v2/server" - stand "github.com/nats-io/nats-streaming-server/server" -) - -type StreamingServerConfig struct { - ClusterID string - Name string - Host string - Port int - Debug bool - MaxConn int -} - -func DefaultStreamingServerConfig() *StreamingServerConfig { - return &StreamingServerConfig{ - ClusterID: "local", - Name: "defaultServer", - Host: "127.0.0.1", - Port: 4222, - Debug: false, - } -} - -type StreamingServer struct { - serverCfg *StreamingServerConfig -} - -func NewStreamingServer(serverCfg *StreamingServerConfig) *StreamingServer { - return &StreamingServer{ - serverCfg: serverCfg, - } -} - -func (s *StreamingServer) Start() error { - fmt.Printf("nats-streaming-server version %s, ", stand.VERSION) - sOpts := stand.GetDefaultOptions() - // Force the streaming server to setup its own signal handler - sOpts.HandleSignals = true - // // override the NoSigs for NATS since Streaming has its own signal handler - // nOpts.NoSigs = true - // Without this option set to true, the logger is not configured. - sOpts.EnableLogging = true - sOpts.ClientHBInterval = 3 * time.Second - sOpts.ClientHBTimeout = 60 * time.Second - sOpts.ID = s.serverCfg.ClusterID - nOpts := &natsd.Options{ - ServerName: s.serverCfg.Name, - Host: s.serverCfg.Host, - Port: s.serverCfg.Port, - Debug: s.serverCfg.Debug, - MaxConn: s.serverCfg.MaxConn, - } - // This will invoke RunServerWithOpts but on Windows, may run it as a service. - if _, err := stand.Run(sOpts, nOpts); err != nil { - return err - } - - return nil -} diff --git a/pkg/lineage/event.go b/pkg/lineage/event.go new file mode 100644 index 0000000..a8d0def --- /dev/null +++ b/pkg/lineage/event.go @@ -0,0 +1,30 @@ +package lineage + +import ( + "context" + "fmt" + + "github.com/koolay/sqlboss/pkg/proto" +) + +// LineageOnSQLEventHandler sql命令事件处理, 血缘分析 +type LineageOnSQLEventHandler struct { + // storager Storager +} + +func (s LineageOnSQLEventHandler) HandlerName() string { + // this name is passed to EventsSubscriberConstructor and used to generate queue name + return "LineageOnSQLEventHandler" +} + +func (LineageOnSQLEventHandler) NewEvent() interface{} { + return &proto.SqlCommand{} +} + +func (s LineageOnSQLEventHandler) Handle(ctx context.Context, event interface{}) error { + data := event.(*proto.SqlCommand) + // 解析 event.SQL, 生成血缘RDF数据 + fmt.Println("received sql command") + fmt.Println("LineageOnSQLEventHandler", data) + return nil +} diff --git a/pkg/message/cqrs.go b/pkg/message/cqrs.go new file mode 100644 index 0000000..105ecf4 --- /dev/null +++ b/pkg/message/cqrs.go @@ -0,0 +1,146 @@ +package message + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type CommandBus interface { + Send(ctx context.Context, cmd interface{}) error +} + +type EventBus interface { + Publish(ctx context.Context, event interface{}) error +} + +type CommandHandlerGenerator func(cb *cqrs.CommandBus, eb *cqrs.EventBus) cqrs.CommandHandler +type EventHandlerGenerator func(cb *cqrs.CommandBus, eb *cqrs.EventBus) cqrs.EventHandler + +type CQRSServer struct { + router *message.Router + commandsPublisher message.Publisher + commandsSubscriber message.Subscriber + eventsPublisher message.Publisher + eventsSubscriber message.Subscriber + cqrsMarshaler cqrs.CommandEventMarshaler + + commandBus CommandBus + eventBus EventBus + nlog watermill.LoggerAdapter +} + +func NewCQRSServer(logger *logrus.Entry, cqrsMarshaler cqrs.CommandEventMarshaler) (*CQRSServer, error) { + nlog := &NatsLog{logger: logger} + // CQRS is built on messages router. + // Detailed documentation: https://watermill.io/docs/messages-router/ + router, err := message.NewRouter(message.RouterConfig{}, nlog) + if err != nil { + return nil, err + } + + commandsPubSub := gochannel.NewGoChannel( + gochannel.Config{BlockPublishUntilSubscriberAck: true}, + nlog, + ) + + eventsPubSub := gochannel.NewGoChannel( + gochannel.Config{BlockPublishUntilSubscriberAck: true}, + nlog, + ) + + return &CQRSServer{ + router: router, + commandsPublisher: commandsPubSub, + commandsSubscriber: commandsPubSub, + eventsPublisher: eventsPubSub, + eventsSubscriber: eventsPubSub, + cqrsMarshaler: cqrsMarshaler, + nlog: nlog, + }, nil +} + +func (s *CQRSServer) Setup(commandHandlerGenerators []CommandHandlerGenerator, + eventHandlerGenerators []EventHandlerGenerator) error { + + // Simple middleware which will recover panics from event or command handlers. + // More about router middlewares you can find in the documentation: + // https://watermill.io/docs/messages-router/#middleware + // + // List of available middlewares you can find in message/router/middleware. + s.router.AddMiddleware(middleware.Recoverer) + + // cqrs.Facade is facade for Command and Event buses and processors. + // You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade) + cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{ + GenerateCommandsTopic: func(commandName string) string { + // we are using queue RabbitMQ config, so we need to have topic per command type + return commandName + }, + CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler { + handlers := []cqrs.CommandHandler{} + for _, gen := range commandHandlerGenerators { + handlers = append(handlers, gen(cb, eb)) + } + + return handlers + }, + CommandsPublisher: s.commandsPublisher, + CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) { + // we can reuse subscriber, because all commands have separated topics + return s.commandsSubscriber, nil + }, + GenerateEventsTopic: func(eventName string) string { + // because we are using PubSub RabbitMQ config, we can use one topic for all events + return "events" + + // we can also use topic per event type + // return eventName + }, + EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler { + handlers := []cqrs.EventHandler{} + for _, gen := range eventHandlerGenerators { + handlers = append(handlers, gen(cb, eb)) + } + + return handlers + }, + EventsPublisher: s.eventsPublisher, + EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) { + return s.eventsSubscriber, nil + }, + Router: s.router, + CommandEventMarshaler: s.cqrsMarshaler, + Logger: s.nlog, + }) + + if err != nil { + return err + } + + s.commandBus = cqrsFacade.CommandBus() + s.eventBus = cqrsFacade.EventBus() + + return nil +} + +func (s *CQRSServer) GetCommandBus() CommandBus { + return s.commandBus +} + +func (s *CQRSServer) GetEventBus() EventBus { + return s.eventBus +} + +func (s *CQRSServer) Start() error { + if err := s.router.Run(context.Background()); err != nil { + return errors.Wrap(err, "failed to run cqrs server") + } + return nil +} diff --git a/pkg/message/pubsub.go b/pkg/message/pubsub.go deleted file mode 100644 index a567454..0000000 --- a/pkg/message/pubsub.go +++ /dev/null @@ -1,24 +0,0 @@ -package message - -import ( - "sync" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" -) - -var ( - pubSub *gochannel.GoChannel - pubSubOnce sync.Once -) - -func NewPubSub() *gochannel.GoChannel { - pubSubOnce.Do(func() { - pubSub = gochannel.NewGoChannel( - gochannel.Config{}, - watermill.NewStdLogger(false, false), - ) - }) - - return pubSub -} diff --git a/pkg/proto/sqlcmd.go b/pkg/proto/sqlcmd.go new file mode 100644 index 0000000..b8cfb03 --- /dev/null +++ b/pkg/proto/sqlcmd.go @@ -0,0 +1,39 @@ +package proto + +// LogMessage sql日志存储内容定义 +// 应该是经过sql解析后的内容 +type LogMessage struct { + // Env 环境, dev,test,prod + Env string `json:"env"` + // App 应用名称 + App string `json:"app"` + Database string `json:"database"` + // SQL 原始sql + SQL string `json:"sql"` + // SqlFingerprint sql指纹,用来标识sql,排除sql中值的变化 + SqlFingerprint string `json:"sql_fingerprint"` + // User db用户名 + User string `json:"user"` + Table string `json:"table"` + Fields []string `json:"fields"` + // PerformanceScore 性能分数 + PerformanceScore float32 `json:"performance_score"` + // Duration sql执行间隔(秒),精确到毫秒 + Duration int `json:"duration"` + // 接收到sql的时间戳表示 + Occtime int64 `json:"occtime"` +} + +// SqlCommand sql命令 +type SqlCommand struct { + // App 应用名称 + App string `json:"app"` + // Env 环境, dev,test,prod + Env string `json:"env"` + Database string `json:"database"` + SQL string `json:"sql"` + // 接收到sql的时间戳表示 + Occtime int64 `json:"occtime"` + // User db用户名 + User string `json:"user"` +} diff --git a/pkg/proto/sqlcmd.proto b/pkg/proto/sqlcmd.proto new file mode 100644 index 0000000..8c419ac --- /dev/null +++ b/pkg/proto/sqlcmd.proto @@ -0,0 +1,4 @@ +syntax = "proto3"; +package main; + +import "google/protobuf/timestamp.proto"; diff --git a/pkg/store/event.go b/pkg/store/event.go new file mode 100644 index 0000000..f2609a3 --- /dev/null +++ b/pkg/store/event.go @@ -0,0 +1,31 @@ +package store + +import ( + "context" + "fmt" + + "github.com/koolay/sqlboss/pkg/proto" +) + +// StoreOnSQLEventHandler sql命令事件处理, 日志存储 +type StoreOnSQLEventHandler struct { + // storager Storager +} + +func (s StoreOnSQLEventHandler) HandlerName() string { + // this name is passed to EventsSubscriberConstructor and used to generate queue name + return "StoreOnSQLEventHandler" +} + +func (StoreOnSQLEventHandler) NewEvent() interface{} { + return &proto.SqlCommand{} +} + +func (s StoreOnSQLEventHandler) Handle(ctx context.Context, event interface{}) error { + data := event.(*proto.SqlCommand) + // 解析 event.SQL, 用soar分析sql性能, 生成message对象, 并存储 + // s.storager.Insert() + fmt.Println("received sql command") + fmt.Println("StoreOnSQLEventHandler", data) + return nil +} diff --git a/pkg/store/loki.go b/pkg/store/loki.go new file mode 100644 index 0000000..f474662 --- /dev/null +++ b/pkg/store/loki.go @@ -0,0 +1,14 @@ +package store + +import "github.com/koolay/sqlboss/pkg/proto" + +type Storager interface { + Insert(data *proto.LogMessage) error +} + +type LokiStorager struct { +} + +func (lk LokiStorager) Insert(data *proto.LogMessage) error { + return nil +} diff --git a/pkg/store/soar.go b/pkg/store/soar.go new file mode 100644 index 0000000..c0519d5 --- /dev/null +++ b/pkg/store/soar.go @@ -0,0 +1,17 @@ +package store + +// Soar 性能分析 +type Soar struct { +} + +type SoarResult struct { + Score float32 + Tips string +} + +func (s Soar) Analyst(sql string) (*SoarResult, error) { + return &SoarResult{ + Score: 90.0, + Tips: "", + }, nil +} diff --git a/pkg/worker/serve.go b/pkg/worker/serve.go deleted file mode 100644 index 5f8e780..0000000 --- a/pkg/worker/serve.go +++ /dev/null @@ -1,51 +0,0 @@ -package worker - -import ( - "context" - "log" - - "github.com/koolay/sqlboss/pkg/conf" - "github.com/sirupsen/logrus" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" -) - -type Worker struct { - cfg *conf.Config - pubsub *gochannel.GoChannel - logger *logrus.Logger -} - -func NewWorker(cfg *conf.Config, pubsub *gochannel.GoChannel, logger *logrus.Logger) *Worker { - return &Worker{ - cfg: cfg, - logger: logger, - pubsub: pubsub, - } -} - -func (w *Worker) Setup() error { - return nil -} - -func (w *Worker) Run(ctx context.Context) error { - log.Println("start consume", w.cfg.Stream.Topic) - messages, err := w.pubsub.Subscribe(ctx, w.cfg.Stream.Topic) - if err != nil { - return err - } - - go w.process(messages) - return nil -} - -func (w *Worker) process(messages <-chan *message.Message) { - for msg := range messages { - log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload)) - - // we need to Acknowledge that we received and processed the message, - // otherwise, it will be resent over and over again. - msg.Ack() - } -}