diff --git a/Dockerfile b/Dockerfile index 2ee24bc1d..364c38ba8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,10 +12,11 @@ FROM gcr.io/distroless/base AS runtime WORKDIR /pipeline-backend +COPY --from=build /go/src/config ./config +COPY --from=build /go/src/release-please ./release-please COPY --from=build /go/src/internal/db/migration ./internal/db/migration -COPY --from=build /pipeline-backend-migrate ./ -COPY --from=build /go/src/config ./config +COPY --from=build /pipeline-backend-migrate ./ COPY --from=build /pipeline-backend ./ ENTRYPOINT ["./pipeline-backend"] diff --git a/Makefile b/Makefile index 7d252b8cf..64f1a4902 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ .DEFAULT_GOAL:=help -DEVELOP_SERVICES := pipeline_backend +DEV := pipeline_backend DEP := mgmt_backend connector_backend model_backend triton_conda_env -DB := pg_sql +DB := pg_sql redis TRITON := triton_server -TEMPORAL := temporal redis redoc_openapi +TEMPORAL := temporal #============================================================================ diff --git a/cmd/main/main.go b/cmd/main/main.go index 02c45153a..b22a405d5 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -10,6 +10,7 @@ import ( "strings" "syscall" + "github.com/go-redis/redis/v9" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/rs/cors" "golang.org/x/net/http2" @@ -29,10 +30,13 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/handler" "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/service" + "github.com/instill-ai/pipeline-backend/pkg/usage" + "github.com/instill-ai/x/repo" - cache "github.com/instill-ai/pipeline-backend/internal/cache" database "github.com/instill-ai/pipeline-backend/internal/db" pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" + usagePB "github.com/instill-ai/protogen-go/vdp/usage/v1alpha" + usageclient "github.com/instill-ai/usage-client/usage" ) func grpcHandlerFunc(grpcServer *grpc.Server, gwHandler http.Handler, CORSOrigins []string) http.Handler { @@ -67,9 +71,6 @@ func main() { db := database.GetConnection() defer database.Close(db) - cache.Init() - defer cache.Close() - // Create tls based credential. var creds credentials.TransportCredentials var err error @@ -113,16 +114,32 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + userServiceClient, userServiceClientConn := external.InitUserServiceClient() + defer userServiceClientConn.Close() + + connectorServiceClient, connectorServiceClientConn := external.InitConnectorServiceClient() + defer connectorServiceClientConn.Close() + + modelServiceClient, modelServiceClientConn := external.InitModelServiceClient() + defer modelServiceClientConn.Close() + + redisClient := redis.NewClient(&config.Config.Cache.Redis.RedisOptions) + defer redisClient.Close() + + repository := repository.NewRepository(db) + + service := service.NewService( + repository, + userServiceClient, + connectorServiceClient, + modelServiceClient, + redisClient, + ) + grpcS := grpc.NewServer(grpcServerOpts...) pipelinePB.RegisterPipelineServiceServer( grpcS, - handler.NewHandler( - service.NewService( - repository.NewRepository(db), - external.InitUserServiceClient(), - external.InitConnectorServiceClient(), - external.InitModelServiceClient(), - )), + handler.NewHandler(service), ) gwS := runtime.NewServeMux( @@ -145,6 +162,30 @@ func main() { panic(err) } + // Start usage reporter + if !config.Config.Server.DisableUsage { + version, err := repo.ReadReleaseManifest("release-please/manifest.json") + if err != nil { + logger.Fatal(err.Error()) + } + + usageServiceClient, usageServiceClientConn := external.InitUsageServiceClient() + defer usageServiceClientConn.Close() + + usg := usage.NewUsage(repository, userServiceClient, redisClient) + err = usageclient.StartReporter( + context.Background(), + usageServiceClient, + usagePB.Session_SERVICE_PIPELINE, + config.Config.Server.Edition, + version, + usg.RetrieveUsageData) + if err != nil { + logger.Error(fmt.Sprintf("Unable to start usage reporter: %v\n", err)) + } + } + + // Start gRPC server var dialOpts []grpc.DialOption if config.Config.Server.HTTPS.Cert != "" && config.Config.Server.HTTPS.Key != "" { dialOpts = []grpc.DialOption{grpc.WithTransportCredentials(creds)} diff --git a/config/config.go b/config/config.go index 7f93b3511..1d1d72034 100644 --- a/config/config.go +++ b/config/config.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/go-redis/redis/v8" + "github.com/go-redis/redis/v9" "github.com/knadh/koanf" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/env" @@ -28,6 +28,7 @@ type AppConfig struct { MgmtBackend MgmtBackendConfig `koanf:"mgmtbackend"` ConnectorBackend ConnectorBackendConfig `koanf:"connectorbackend"` ModelBackend ModelBackendConfig `koanf:"modelbackend"` + UsageBackend UsageBackendConfig `koanf:"usagebackend"` } // ServerConfig defines HTTP server configurations @@ -37,10 +38,9 @@ type ServerConfig struct { Cert string `koanf:"cert"` Key string `koanf:"key"` } - CORSOrigins []string `koanf:"corsorigins"` - Paginate struct { - Salt string `koanf:"salt"` - } + CORSOrigins []string `koanf:"corsorigins"` + Edition string `koanf:"edition"` + DisableUsage bool `koanf:"disableusage"` } // DatabaseConfig related to database @@ -101,6 +101,16 @@ type ModelBackendConfig struct { } } +// UsageBackendConfig related to usage-backend +type UsageBackendConfig struct { + Host string `koanf:"host"` + Port int `koanf:"port"` + HTTPS struct { + Cert string `koanf:"cert"` + Key string `koanf:"key"` + } +} + // Init - Assign global config to decoded config struct func Init() error { logger, _ := logger.GetZapLogger() diff --git a/config/config.yaml b/config/config.yaml index 77caa7771..967188bf5 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,8 @@ server: - http://localhost - https://instill-inc.tech - https://instill.tech + edition: local-ce:dev + disableusage: false database: username: postgres password: password @@ -45,3 +47,9 @@ modelbackend: https: cert: # /ssl/tls.crt key: # /ssl/tls.key +usagebackend: + host: localhost + port: 8084 + https: + cert: # /ssl/tls.crt + key: # /ssl/tls.crt diff --git a/docker-compose.yml b/docker-compose.yml index f152d3999..069f58d77 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,8 +15,6 @@ volumes: services: mgmt_backend_migrate: container_name: mgmt-backend-migrate - build: - context: . image: instill/mgmt-backend:dev restart: on-failure environment: @@ -31,8 +29,6 @@ services: mgmt_backend_init: container_name: mgmt-backend-init - build: - context: . image: instill/mgmt-backend:dev restart: on-failure environment: @@ -46,8 +42,6 @@ services: mgmt_backend: container_name: mgmt-backend - build: - context: . image: instill/mgmt-backend:dev restart: unless-stopped environment: @@ -56,6 +50,8 @@ services: CFG_DATABASE_PORT: 5432 CFG_DATABASE_USERNAME: postgres CFG_DATABASE_PASSWORD: password + CFG_USAGEBACKEND_HOST: usage-backend-cloud + CFG_USAGEBACKEND_PORT: 8084 ports: - 8080:8080 depends_on: @@ -97,6 +93,8 @@ services: CFG_CONNECTORBACKEND_PORT: 8082 CFG_MODELBACKEND_HOST: model_backend CFG_MODELBACKEND_PORT: 8083 + CFG_USAGEBACKEND_HOST: usage-backend-cloud + CFG_USAGEBACKEND_PORT: 8084 ports: - 8081:8081 volumes: diff --git a/go.mod b/go.mod index e34c14525..8d9e91a42 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/instill-ai/pipeline-backend go 1.18 require ( - github.com/go-redis/redis/v8 v8.11.4 + github.com/go-redis/redis/v9 v9.0.0-beta.1 github.com/gofrs/uuid v4.0.0+incompatible github.com/gogo/status v1.1.0 github.com/golang-migrate/migrate/v4 v4.15.1 @@ -11,8 +11,9 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.3 github.com/iancoleman/strcase v0.2.0 - github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220530153944-e1c33d5f7d13 - github.com/instill-ai/x v0.1.0-alpha.0.20220517204940-5a70916ce425 + github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220606124550-53f79f9c7d74 + github.com/instill-ai/usage-client v0.0.0-20220606142220-c17424a565e4 + github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb github.com/knadh/koanf v1.4.0 github.com/mennanov/fieldmask-utils v0.5.0 github.com/rs/cors v1.8.2 @@ -22,11 +23,12 @@ require ( golang.org/x/net v0.0.0-20220225172249-27dd8689420f google.golang.org/grpc v1.45.0 google.golang.org/protobuf v1.27.1 - gorm.io/driver/postgres v1.2.3 - gorm.io/gorm v1.22.5 + gorm.io/driver/postgres v1.3.6 + gorm.io/gorm v1.23.5 ) require ( + github.com/catalinc/hashcash v0.0.0-20161205220751-e6bc29ff4de9 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -39,13 +41,13 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.10.1 + github.com/jackc/pgconn v1.12.1 github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgproto3/v2 v2.2.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect - github.com/jackc/pgtype v1.9.0 // indirect - github.com/jackc/pgx/v4 v4.14.0 // indirect + github.com/jackc/pgtype v1.11.0 // indirect + github.com/jackc/pgx/v4 v4.16.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.4 // indirect github.com/lib/pq v1.10.2 // indirect diff --git a/go.sum b/go.sum index 4b025239a..106c2ce82 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,8 @@ github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7 github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= +github.com/catalinc/hashcash v0.0.0-20161205220751-e6bc29ff4de9 h1:mzt00lI/krYDFH1qNfQdDZze2GjRaTeho7Ch9af/wsY= +github.com/catalinc/hashcash v0.0.0-20161205220751-e6bc29ff4de9/go.mod h1:Qj15jt0Y3YvBTjOfWQ7WdgNtSE9WnbzIDpLcTcpQ1qw= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -384,12 +386,11 @@ github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL9 github.com/go-openapi/spec v0.19.3/go.mod h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8Lj9mJglo= github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= -github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= -github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= +github.com/go-redis/redis/v9 v9.0.0-beta.1 h1:oW3jlPic5HhGUbYMH0lidnP+72BgsT+lCwlVud6o2Mc= +github.com/go-redis/redis/v9 v9.0.0-beta.1/go.mod h1:6gNX1bXdwkpEG0M/hEBNK/Fp8zdyCkjwwKc6vBbfCDI= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= @@ -591,10 +592,12 @@ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220530153944-e1c33d5f7d13 h1:RG9YOU8B0vybwzlSzWB5lBkTCwQvERgyzL/8MsS8mvY= -github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220530153944-e1c33d5f7d13/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas= -github.com/instill-ai/x v0.1.0-alpha.0.20220517204940-5a70916ce425 h1:XR5kKLN00JNR09G5oB176U39sFGyF27HHoVGA96zFrg= -github.com/instill-ai/x v0.1.0-alpha.0.20220517204940-5a70916ce425/go.mod h1:Chc+UChNgV4ejOlji4Ca2l+83fPdDbOML5ToQxEnJ4Y= +github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220606124550-53f79f9c7d74 h1:daR/0QKiOy/y1tnrg0Gdgt6Hsf5Im7Zxmu8c4SLwVls= +github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220606124550-53f79f9c7d74/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas= +github.com/instill-ai/usage-client v0.0.0-20220606142220-c17424a565e4 h1:yJ34x5zUKawwslRAPug8Ad6rziIvhheITQvz7auNyf8= +github.com/instill-ai/usage-client v0.0.0-20220606142220-c17424a565e4/go.mod h1:ibsksgd0nzO7zHom5qVa9lmb9Y4XuSF8JMjBpkdGW6s= +github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb h1:70AJVfr463jWkgPQ1w281zsQ1LK/tOW5INTNc+yOBsI= +github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb/go.mod h1:uMgeUs+q+Bjr43AsYb3QRsFX2Ebxhr74seM8lNdbdLA= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= @@ -609,8 +612,8 @@ github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpT github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= -github.com/jackc/pgconn v1.10.1 h1:DzdIHIjG1AxGwoEEqS+mGsURyjt4enSmqzACXvVzOT8= -github.com/jackc/pgconn v1.10.1/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgconn v1.12.1 h1:rsDFzIpRk7xT4B8FufgpCCeyjdNpKyghZeSefViE5W8= +github.com/jackc/pgconn v1.12.1/go.mod h1:ZkhRC59Llhrq3oSfrikvwQ5NaxYExr6twkdkMLaKono= github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= @@ -629,8 +632,8 @@ github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.0.7/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.2.0 h1:r7JypeP2D3onoQTCxWdTpCtJ4D+qpKr0TxvoyMhZ5ns= -github.com/jackc/pgproto3/v2 v2.2.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.0 h1:brH0pCGBDkBW07HWlN/oSBXrmo3WB0UvZd1pIuDcL8Y= +github.com/jackc/pgproto3/v2 v2.3.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= @@ -642,8 +645,8 @@ github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkAL github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ= github.com/jackc/pgtype v1.6.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= -github.com/jackc/pgtype v1.9.0 h1:/SH1RxEtltvJgsDqp3TbiTFApD3mey3iygpuEGeuBXk= -github.com/jackc/pgtype v1.9.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgtype v1.11.0 h1:u4uiGPz/1hryuXzyaBhSk6dnIyyG2683olG2OV+UUgs= +github.com/jackc/pgtype v1.11.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= @@ -652,18 +655,17 @@ github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6 github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= github.com/jackc/pgx/v4 v4.10.1/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.14.0 h1:TgdrmgnM7VY72EuSQzBbBd4JA1RLqJolrw9nQVZABVc= -github.com/jackc/pgx/v4 v4.14.0/go.mod h1:jT3ibf/A0ZVCp89rtCIN0zCJxcE74ypROmHEZYsG/j8= +github.com/jackc/pgx/v4 v4.16.1 h1:JzTglcal01DrghUqt+PmzWsZx/Yh7SC/CTQmSBMTd0Y= +github.com/jackc/pgx/v4 v4.16.1/go.mod h1:SIhx0D5hoADaiXZVyv+3gSm3LCIIINTVO0PficsvWGQ= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.2.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= -github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.4 h1:tHnRBy1i5F2Dh8BAFxqFzxKqqvezXrL2OW1TnX+Mlas= github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -799,7 +801,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -811,17 +812,14 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= -github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= -github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -1160,7 +1158,6 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -1175,7 +1172,6 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210505024714-0287a6fb4125/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -1289,7 +1285,6 @@ golang.org/x/sys v0.0.0-20201126233918-771906719818/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201202213521-69691e467435/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1397,7 +1392,6 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= @@ -1606,13 +1600,13 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/postgres v1.0.8/go.mod h1:4eOzrI1MUfm6ObJU/UcmbXyiHSs8jSwH95G5P5dxcAg= -gorm.io/driver/postgres v1.2.3 h1:f4t0TmNMy9gh3TU2PX+EppoA6YsgFnyq8Ojtddb42To= -gorm.io/driver/postgres v1.2.3/go.mod h1:pJV6RgYQPG47aM1f0QeOzFH9HxQc8JcmAgjRCgS0wjs= +gorm.io/driver/postgres v1.3.6 h1:Q0iLoYvWwsJVpYQrSrY5p5P4YzW7fJjFMBG2sa4Bz5U= +gorm.io/driver/postgres v1.3.6/go.mod h1:f02ympjIcgtHEGFMZvdgTxODZ9snAHDb4hXfigBVuNI= gorm.io/gorm v1.20.12/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw= gorm.io/gorm v1.21.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw= -gorm.io/gorm v1.22.3/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= -gorm.io/gorm v1.22.5 h1:lYREBgc02Be/5lSCTuysZZDb6ffL2qrat6fg9CFbvXU= -gorm.io/gorm v1.22.5/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= +gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= +gorm.io/gorm v1.23.5 h1:TnlF26wScKSvknUC/Rn8t0NLLM22fypYBlvj1+aH6dM= +gorm.io/gorm v1.23.5/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/internal/cache/cache.go b/internal/cache/cache.go deleted file mode 100644 index 603476932..000000000 --- a/internal/cache/cache.go +++ /dev/null @@ -1,19 +0,0 @@ -package cache - -import ( - "github.com/go-redis/redis/v8" - - "github.com/instill-ai/pipeline-backend/config" -) - -var Redis *redis.Client - -func Init() { - Redis = redis.NewClient(&config.Config.Cache.Redis.RedisOptions) -} - -func Close() { - if Redis != nil { - Redis.Close() - } -} diff --git a/internal/external/external.go b/internal/external/external.go index 8c0ec8f56..4d1c3c572 100644 --- a/internal/external/external.go +++ b/internal/external/external.go @@ -13,10 +13,11 @@ import ( connectorPB "github.com/instill-ai/protogen-go/vdp/connector/v1alpha" mgmtPB "github.com/instill-ai/protogen-go/vdp/mgmt/v1alpha" modelPB "github.com/instill-ai/protogen-go/vdp/model/v1alpha" + usagePB "github.com/instill-ai/protogen-go/vdp/usage/v1alpha" ) // InitUserServiceClient initialises a UserServiceClient instance -func InitUserServiceClient() mgmtPB.UserServiceClient { +func InitUserServiceClient() (mgmtPB.UserServiceClient, *grpc.ClientConn) { logger, _ := logger.GetZapLogger() var clientDialOpts grpc.DialOption @@ -37,11 +38,11 @@ func InitUserServiceClient() mgmtPB.UserServiceClient { logger.Fatal(err.Error()) } - return mgmtPB.NewUserServiceClient(clientConn) + return mgmtPB.NewUserServiceClient(clientConn), clientConn } // InitConnectorServiceClient initialises a ConnectorServiceClient instance -func InitConnectorServiceClient() connectorPB.ConnectorServiceClient { +func InitConnectorServiceClient() (connectorPB.ConnectorServiceClient, *grpc.ClientConn) { logger, _ := logger.GetZapLogger() var clientDialOpts grpc.DialOption @@ -62,11 +63,11 @@ func InitConnectorServiceClient() connectorPB.ConnectorServiceClient { logger.Fatal(err.Error()) } - return connectorPB.NewConnectorServiceClient(clientConn) + return connectorPB.NewConnectorServiceClient(clientConn), clientConn } // InitModelServiceClient initialises a ModelServiceClient instance -func InitModelServiceClient() modelPB.ModelServiceClient { +func InitModelServiceClient() (modelPB.ModelServiceClient, *grpc.ClientConn) { logger, _ := logger.GetZapLogger() var clientDialOpts grpc.DialOption @@ -87,5 +88,31 @@ func InitModelServiceClient() modelPB.ModelServiceClient { logger.Fatal(err.Error()) } - return modelPB.NewModelServiceClient(clientConn) + return modelPB.NewModelServiceClient(clientConn), clientConn +} + +// InitUsageServiceClient initialises a UsageServiceClient instance +func InitUsageServiceClient() (usagePB.UsageServiceClient, *grpc.ClientConn) { + logger, _ := logger.GetZapLogger() + + var clientDialOpts grpc.DialOption + var usageCreds credentials.TransportCredentials + var err error + if config.Config.UsageBackend.HTTPS.Cert != "" && config.Config.UsageBackend.HTTPS.Key != "" { + usageCreds, err = credentials.NewServerTLSFromFile(config.Config.UsageBackend.HTTPS.Cert, config.Config.UsageBackend.HTTPS.Key) + if err != nil { + logger.Fatal(err.Error()) + } + clientDialOpts = grpc.WithTransportCredentials(usageCreds) + } else { + clientDialOpts = grpc.WithTransportCredentials(insecure.NewCredentials()) + } + + clientConn, err := grpc.Dial(fmt.Sprintf("%v:%v", config.Config.UsageBackend.Host, config.Config.UsageBackend.Port), clientDialOpts) + if err != nil { + logger.Fatal(err.Error()) + } + + return usagePB.NewUsageServiceClient(clientConn), clientConn + } diff --git a/pkg/handler/convert.go b/pkg/handler/convert.go index ac96d22e0..304d4d4a1 100644 --- a/pkg/handler/convert.go +++ b/pkg/handler/convert.go @@ -117,7 +117,7 @@ func DBToPBPipeline(dbPipeline *datamodel.Pipeline) *pipelinePB.Pipeline { if strings.HasPrefix(dbPipeline.Owner, "users/") { pbPipeline.Owner = &pipelinePB.Pipeline_User{User: dbPipeline.Owner} - } else if strings.HasPrefix(dbPipeline.Owner, "organizations/") { + } else if strings.HasPrefix(dbPipeline.Owner, "orgs/") { pbPipeline.Owner = &pipelinePB.Pipeline_Org{Org: dbPipeline.Owner} } diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index c8380c6a2..8b40063bd 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -113,7 +113,7 @@ func (h *handler) ListPipeline(ctx context.Context, req *pipelinePB.ListPipeline return &pipelinePB.ListPipelineResponse{}, err } - dbPipelines, totalSize, nextPageToken, err := h.service.ListPipeline(owner, int(req.GetPageSize()), req.GetPageToken(), isBasicView) + dbPipelines, totalSize, nextPageToken, err := h.service.ListPipeline(owner, req.GetPageSize(), req.GetPageToken(), isBasicView) if err != nil { return &pipelinePB.ListPipelineResponse{}, err } diff --git a/pkg/handler/handlercustom.go b/pkg/handler/handlercustom.go index b94adcc28..92495b3d6 100644 --- a/pkg/handler/handlercustom.go +++ b/pkg/handler/handlercustom.go @@ -8,6 +8,9 @@ import ( "net/http" "strings" + "github.com/go-redis/redis/v9" + + "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/internal/constant" "github.com/instill-ai/pipeline-backend/internal/db" "github.com/instill-ai/pipeline-backend/internal/external" @@ -47,11 +50,24 @@ func HandleTriggerPipelineBinaryFileUpload(w http.ResponseWriter, req *http.Requ return } + userServiceClient, userServiceClientConn := external.InitUserServiceClient() + defer userServiceClientConn.Close() + + connectorServiceClient, connectorServiceClientConn := external.InitConnectorServiceClient() + defer connectorServiceClientConn.Close() + + modelServiceClient, modelServiceClientConn := external.InitModelServiceClient() + defer modelServiceClientConn.Close() + + redisClient := redis.NewClient(&config.Config.Cache.Redis.RedisOptions) + defer redisClient.Close() + service := service.NewService( repository.NewRepository(db.GetConnection()), - external.InitUserServiceClient(), - external.InitConnectorServiceClient(), - external.InitModelServiceClient(), + userServiceClient, + connectorServiceClient, + modelServiceClient, + redisClient, ) dbPipeline, err := service.GetPipelineByID(id, owner, false) diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index 612434158..eb174ca1b 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -17,7 +17,7 @@ import ( // Repository interface type Repository interface { CreatePipeline(pipeline *datamodel.Pipeline) error - ListPipeline(owner string, pageSize int, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error) + ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error) GetPipelineByUID(uid uuid.UUID, owner string, isBasicView bool) (*datamodel.Pipeline, error) GetPipelineByID(id string, owner string, isBasicView bool) (*datamodel.Pipeline, error) UpdatePipeline(id string, owner string, pipeline *datamodel.Pipeline) error @@ -49,7 +49,7 @@ func (r *repository) CreatePipeline(pipeline *datamodel.Pipeline) error { return nil } -func (r *repository) ListPipeline(owner string, pageSize int, pageToken string, isBasicView bool) (pipelines []datamodel.Pipeline, totalSize int64, nextPageToken string, err error) { +func (r *repository) ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool) (pipelines []datamodel.Pipeline, totalSize int64, nextPageToken string, err error) { if result := r.db.Model(&datamodel.Pipeline{}).Where("owner = ?", owner).Count(&totalSize); result.Error != nil { return nil, 0, "", status.Errorf(codes.Internal, result.Error.Error()) diff --git a/pkg/service/convert.go b/pkg/service/convert.go index 107b4324b..a5ad94053 100644 --- a/pkg/service/convert.go +++ b/pkg/service/convert.go @@ -16,6 +16,8 @@ import ( func (s *service) ownerNameToPermalink(owner *string) error { + // TODO: implement cache + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() diff --git a/pkg/service/mock_repository_test.go b/pkg/service/mock_repository_test.go index 1ff9b1d31..37b7ea7dd 100644 --- a/pkg/service/mock_repository_test.go +++ b/pkg/service/mock_repository_test.go @@ -94,7 +94,7 @@ func (mr *MockRepositoryMockRecorder) GetPipelineByUID(arg0, arg1, arg2 interfac } // ListPipeline mocks base method. -func (m *MockRepository) ListPipeline(arg0 string, arg1 int, arg2 string, arg3 bool) ([]datamodel.Pipeline, int64, string, error) { +func (m *MockRepository) ListPipeline(arg0 string, arg1 int64, arg2 string, arg3 bool) ([]datamodel.Pipeline, int64, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPipeline", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]datamodel.Pipeline) diff --git a/pkg/service/mock_usage_grpc_test.go b/pkg/service/mock_usage_grpc_test.go new file mode 100644 index 000000000..473a3b6a1 --- /dev/null +++ b/pkg/service/mock_usage_grpc_test.go @@ -0,0 +1,117 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/instill-ai/protogen-go/vdp/usage/v1alpha (interfaces: UsageServiceClient) + +// Package service_test is a generated GoMock package. +package service_test + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + usagev1alpha "github.com/instill-ai/protogen-go/vdp/usage/v1alpha" + grpc "google.golang.org/grpc" +) + +// MockUsageServiceClient is a mock of UsageServiceClient interface. +type MockUsageServiceClient struct { + ctrl *gomock.Controller + recorder *MockUsageServiceClientMockRecorder +} + +// MockUsageServiceClientMockRecorder is the mock recorder for MockUsageServiceClient. +type MockUsageServiceClientMockRecorder struct { + mock *MockUsageServiceClient +} + +// NewMockUsageServiceClient creates a new mock instance. +func NewMockUsageServiceClient(ctrl *gomock.Controller) *MockUsageServiceClient { + mock := &MockUsageServiceClient{ctrl: ctrl} + mock.recorder = &MockUsageServiceClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockUsageServiceClient) EXPECT() *MockUsageServiceClientMockRecorder { + return m.recorder +} + +// CreateSession mocks base method. +func (m *MockUsageServiceClient) CreateSession(arg0 context.Context, arg1 *usagev1alpha.CreateSessionRequest, arg2 ...grpc.CallOption) (*usagev1alpha.CreateSessionResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateSession", varargs...) + ret0, _ := ret[0].(*usagev1alpha.CreateSessionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateSession indicates an expected call of CreateSession. +func (mr *MockUsageServiceClientMockRecorder) CreateSession(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSession", reflect.TypeOf((*MockUsageServiceClient)(nil).CreateSession), varargs...) +} + +// Liveness mocks base method. +func (m *MockUsageServiceClient) Liveness(arg0 context.Context, arg1 *usagev1alpha.LivenessRequest, arg2 ...grpc.CallOption) (*usagev1alpha.LivenessResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Liveness", varargs...) + ret0, _ := ret[0].(*usagev1alpha.LivenessResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Liveness indicates an expected call of Liveness. +func (mr *MockUsageServiceClientMockRecorder) Liveness(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Liveness", reflect.TypeOf((*MockUsageServiceClient)(nil).Liveness), varargs...) +} + +// Readiness mocks base method. +func (m *MockUsageServiceClient) Readiness(arg0 context.Context, arg1 *usagev1alpha.ReadinessRequest, arg2 ...grpc.CallOption) (*usagev1alpha.ReadinessResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Readiness", varargs...) + ret0, _ := ret[0].(*usagev1alpha.ReadinessResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Readiness indicates an expected call of Readiness. +func (mr *MockUsageServiceClientMockRecorder) Readiness(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Readiness", reflect.TypeOf((*MockUsageServiceClient)(nil).Readiness), varargs...) +} + +// SendSessionReport mocks base method. +func (m *MockUsageServiceClient) SendSessionReport(arg0 context.Context, arg1 *usagev1alpha.SendSessionReportRequest, arg2 ...grpc.CallOption) (*usagev1alpha.SendSessionReportResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendSessionReport", varargs...) + ret0, _ := ret[0].(*usagev1alpha.SendSessionReportResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendSessionReport indicates an expected call of SendSessionReport. +func (mr *MockUsageServiceClientMockRecorder) SendSessionReport(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendSessionReport", reflect.TypeOf((*MockUsageServiceClient)(nil).SendSessionReport), varargs...) +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 8a99f7607..31e912ef9 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -5,12 +5,15 @@ import ( "context" "fmt" "io" + "strings" "time" + "github.com/go-redis/redis/v9" "github.com/gofrs/uuid" "github.com/gogo/status" "google.golang.org/grpc/codes" + "github.com/instill-ai/pipeline-backend/internal/resource" "github.com/instill-ai/pipeline-backend/pkg/datamodel" "github.com/instill-ai/pipeline-backend/pkg/repository" @@ -23,7 +26,7 @@ import ( // Service interface type Service interface { CreatePipeline(pipeline *datamodel.Pipeline) (*datamodel.Pipeline, error) - ListPipeline(owner string, pageSize int, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error) + ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error) GetPipelineByID(id string, owner string, isBasicView bool) (*datamodel.Pipeline, error) GetPipelineByUID(uid uuid.UUID, owner string, isBasicView bool) (*datamodel.Pipeline, error) UpdatePipeline(id string, owner string, updatedPipeline *datamodel.Pipeline) (*datamodel.Pipeline, error) @@ -40,56 +43,58 @@ type service struct { userServiceClient mgmtPB.UserServiceClient connectorServiceClient connectorPB.ConnectorServiceClient modelServiceClient modelPB.ModelServiceClient + redisClient *redis.Client } // NewService initiates a service instance -func NewService(r repository.Repository, u mgmtPB.UserServiceClient, c connectorPB.ConnectorServiceClient, m modelPB.ModelServiceClient) Service { +func NewService(r repository.Repository, mu mgmtPB.UserServiceClient, c connectorPB.ConnectorServiceClient, m modelPB.ModelServiceClient, rc *redis.Client) Service { return &service{ repository: r, - userServiceClient: u, + userServiceClient: mu, connectorServiceClient: c, modelServiceClient: m, + redisClient: rc, } } -func (s *service) CreatePipeline(pipeline *datamodel.Pipeline) (*datamodel.Pipeline, error) { +func (s *service) CreatePipeline(dbPipeline *datamodel.Pipeline) (*datamodel.Pipeline, error) { - mode, err := s.getMode(pipeline.Recipe.Source, pipeline.Recipe.Destination) + mode, err := s.getMode(dbPipeline.Recipe.Source, dbPipeline.Recipe.Destination) if err != nil { return nil, err } - pipeline.Mode = mode + dbPipeline.Mode = mode - ownerRscName := pipeline.Owner - if err := s.ownerNameToPermalink(&pipeline.Owner); err != nil { + ownerRscName := dbPipeline.Owner + if err := s.ownerNameToPermalink(&dbPipeline.Owner); err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } - if err := s.recipeNameToPermalink(pipeline.Recipe); err != nil { + if err := s.recipeNameToPermalink(dbPipeline.Recipe); err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } - if pipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { - pipeline.State = datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) + if dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { + dbPipeline.State = datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) } else { // TODO: Dispatch job to Temporal for periodical connection state check - pipeline.State = datamodel.PipelineState(pipelinePB.Pipeline_STATE_INACTIVE) + dbPipeline.State = datamodel.PipelineState(pipelinePB.Pipeline_STATE_INACTIVE) } - if err := s.repository.CreatePipeline(pipeline); err != nil { + if err := s.repository.CreatePipeline(dbPipeline); err != nil { return nil, err } - dbPipeline, err := s.GetPipelineByID(pipeline.ID, ownerRscName, false) + dbCreatedPipeline, err := s.GetPipelineByID(dbPipeline.ID, ownerRscName, false) if err != nil { return nil, err } - return dbPipeline, nil + return dbCreatedPipeline, nil } -func (s *service) ListPipeline(owner string, pageSize int, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error) { +func (s *service) ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error) { if err := s.ownerNameToPermalink(&owner); err != nil { return nil, 0, "", status.Errorf(codes.InvalidArgument, err.Error()) @@ -276,10 +281,14 @@ func (s *service) ValidatePipeline(pipeline *datamodel.Pipeline) error { return nil } -func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, pipeline *datamodel.Pipeline) (*modelPB.TriggerModelInstanceResponse, error) { +func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipeline *datamodel.Pipeline) (*modelPB.TriggerModelInstanceResponse, error) { + + if err := s.ownerNameToPermalink(&dbPipeline.Owner); err != nil { + return nil, err + } // Check if this is a direct trigger (i.e., HTTP, gRPC source and destination connectors) - if pipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { + if dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -302,7 +311,7 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, pipeli } resp, err := s.modelServiceClient.TriggerModelInstance(ctx, &modelPB.TriggerModelInstanceRequest{ - Name: pipeline.Recipe.ModelInstances[0], + Name: dbPipeline.Recipe.ModelInstances[0], Inputs: inputs, }) @@ -310,16 +319,31 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, pipeli return nil, status.Errorf(codes.Internal, "Error model-backend %s: %v", "TriggerModel", err.Error()) } + // Increment trigger image numbers + uid, err := resource.GetPermalinkUID(dbPipeline.Owner) + if err != nil { + return nil, err + } + if strings.HasPrefix(dbPipeline.Owner, "users/") { + s.redisClient.IncrBy(ctx, fmt.Sprintf("user:%s:trigger.image.num", uid), int64(len(inputs))) + } else if strings.HasPrefix(dbPipeline.Owner, "orgs/") { + s.redisClient.IncrBy(ctx, fmt.Sprintf("org:%s:trigger.image.num", uid), int64(len(inputs))) + } + return resp, nil } return nil, nil } -func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLengths []uint64, pipeline *datamodel.Pipeline) (*modelPB.TriggerModelInstanceBinaryFileUploadResponse, error) { +func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLengths []uint64, dbPipeline *datamodel.Pipeline) (*modelPB.TriggerModelInstanceBinaryFileUploadResponse, error) { + + if err := s.ownerNameToPermalink(&dbPipeline.Owner); err != nil { + return nil, err + } // Check if this is a direct trigger (i.e., HTTP, gRPC source and destination connectors) - if pipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { + if dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -333,7 +357,7 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng } err = stream.Send(&modelPB.TriggerModelInstanceBinaryFileUploadRequest{ - Name: pipeline.Recipe.ModelInstances[0], + Name: dbPipeline.Recipe.ModelInstances[0], FileLengths: fileLengths, }) if err != nil { @@ -362,6 +386,17 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng return nil, status.Errorf(codes.Internal, "cannot receive response: %s", err.Error()) } + // Increment trigger image numbers + uid, err := resource.GetPermalinkUID(dbPipeline.Owner) + if err != nil { + return nil, err + } + if strings.HasPrefix(dbPipeline.Owner, "users/") { + s.redisClient.IncrBy(ctx, fmt.Sprintf("user:%s:trigger.image.num", uid), int64(len(fileLengths))) + } else if strings.HasPrefix(dbPipeline.Owner, "orgs/") { + s.redisClient.IncrBy(ctx, fmt.Sprintf("org:%s:trigger.image.num", uid), int64(len(fileLengths))) + } + return res, nil } diff --git a/pkg/service/service_test.go b/pkg/service/service_test.go index d10985330..14805b5f9 100644 --- a/pkg/service/service_test.go +++ b/pkg/service/service_test.go @@ -4,6 +4,7 @@ package service_test //go:generate mockgen -destination mock_model_grpc_test.go -package $GOPACKAGE github.com/instill-ai/protogen-go/vdp/model/v1alpha ModelServiceClient //go:generate mockgen -destination mock_connector_grpc_test.go -package $GOPACKAGE github.com/instill-ai/protogen-go/vdp/connector/v1alpha ConnectorServiceClient //go:generate mockgen -destination mock_user_grpc_test.go -package $GOPACKAGE github.com/instill-ai/protogen-go/vdp/mgmt/v1alpha UserServiceClient +//go:generate mockgen -destination mock_usage_grpc_test.go -package $GOPACKAGE github.com/instill-ai/protogen-go/vdp/usage/v1alpha UsageServiceClient import ( "database/sql" @@ -62,7 +63,7 @@ func TestCreatePipeline(t *testing.T) { mockModelServiceClient := NewMockModelServiceClient(ctrl) - s := service.NewService(mockRepository, mockUserServiceClient, mockConnectorServiceClient, mockModelServiceClient) + s := service.NewService(mockRepository, mockUserServiceClient, mockConnectorServiceClient, mockModelServiceClient, nil) _, err := s.CreatePipeline(&normalPipeline) diff --git a/pkg/usage/usage.go b/pkg/usage/usage.go new file mode 100644 index 000000000..2032de1f9 --- /dev/null +++ b/pkg/usage/usage.go @@ -0,0 +1,127 @@ +package usage + +import ( + "context" + "fmt" + + "github.com/go-redis/redis/v9" + + "github.com/instill-ai/pipeline-backend/internal/logger" + "github.com/instill-ai/pipeline-backend/pkg/datamodel" + "github.com/instill-ai/pipeline-backend/pkg/repository" + + mgmtPB "github.com/instill-ai/protogen-go/vdp/mgmt/v1alpha" + pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" + usagePB "github.com/instill-ai/protogen-go/vdp/usage/v1alpha" +) + +// Usage interface +type Usage interface { + RetrieveUsageData() interface{} +} + +type usage struct { + repository repository.Repository + userServiceClient mgmtPB.UserServiceClient + redisClient *redis.Client +} + +// NewUsage initiates a usage instance +func NewUsage(r repository.Repository, u mgmtPB.UserServiceClient, rc *redis.Client) Usage { + return &usage{ + repository: r, + userServiceClient: u, + redisClient: rc, + } +} + +func (u *usage) RetrieveUsageData() interface{} { + + logger, _ := logger.GetZapLogger() + ctx := context.Background() + + logger.Info("Retrieve usage data...") + + pbPipelineUsageData := []*usagePB.PipelineUsageData_UserUsageData{} + + // Roll over all users and update the metrics with the cached uuid + userPageSizeMax := int64(100) + userPageToken := "" + for { + userResp, err := u.userServiceClient.ListUser(ctx, &mgmtPB.ListUserRequest{ + PageSize: &userPageSizeMax, + PageToken: &userPageToken, + }) + if err != nil { + logger.Error(fmt.Sprintf("[mgmt-backend: ListUser] %s", err)) + } + + // Roll all pipeline resources on a user + for _, user := range userResp.Users { + pipePageSizeMax := int64(100) + pipePageToken := "" + pipeActiveStateNum := int64(0) + pipeInactiveStateNum := int64(0) + pipeSyncModeNum := int64(0) + pipeAsyncModeNum := int64(0) + for { + dbPipelines, _, pipeNextPageToken, err := u.repository.ListPipeline(fmt.Sprintf("users/%s", user.GetUid()), pipePageSizeMax, pipePageToken, true) + if err != nil { + logger.Error(fmt.Sprintf("%s", err)) + } + + for _, pipeline := range dbPipelines { + if pipeline.State == datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) { + pipeActiveStateNum++ + } + if pipeline.State == datamodel.PipelineState(pipelinePB.Pipeline_STATE_INACTIVE) { + pipeInactiveStateNum++ + } + if pipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { + pipeSyncModeNum++ + } + if pipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_ASYNC) { + pipeAsyncModeNum++ + } + } + + if pipeNextPageToken == "" { + break + } else { + pipePageToken = pipeNextPageToken + } + } + + triggerImageNum, err := u.redisClient.Get(ctx, fmt.Sprintf("user:%s:trigger.image.num", user.GetUid())).Int64() + if err == redis.Nil { + triggerImageNum = 0 + } else if err != nil { + logger.Error(fmt.Sprintf("%s", err)) + } + + pbPipelineUsageData = append(pbPipelineUsageData, &usagePB.PipelineUsageData_UserUsageData{ + UserUid: user.GetUid(), + PipelineActiveStateNum: pipeActiveStateNum, + PipelineInactiveStateNum: pipeInactiveStateNum, + PipelineSyncModeNum: pipeSyncModeNum, + PipelineAsyncModeNum: pipeAsyncModeNum, + TriggerImageNum: triggerImageNum, + }) + + } + + if userResp.NextPageToken == "" { + break + } else { + userPageToken = userResp.NextPageToken + } + } + + logger.Info("Send retrieved usage data...") + + return &usagePB.SessionReport_PipelineUsageData{ + PipelineUsageData: &usagePB.PipelineUsageData{ + Usages: pbPipelineUsageData, + }, + } +}