diff --git a/spanner/test/cloudexecutor/executor/actions/admin.go b/spanner/test/cloudexecutor/executor/actions/admin.go new file mode 100644 index 000000000000..fb49a26cb1c7 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/admin.go @@ -0,0 +1,901 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "fmt" + "log" + + "cloud.google.com/go/longrunning/autogen/longrunningpb" + "cloud.google.com/go/spanner" + database "cloud.google.com/go/spanner/admin/database/apiv1" + adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb" + instance "cloud.google.com/go/spanner/admin/instance/apiv1" + "cloud.google.com/go/spanner/admin/instance/apiv1/instancepb" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +// AdminActionHandler holds the necessary components and options required for performing admin tasks. +type AdminActionHandler struct { + Action *executorpb.AdminAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption +} + +// ExecuteAction execute admin actions by action case, using OutcomeSender to send status and results back. +func (h *AdminActionHandler) ExecuteAction(ctx context.Context) error { + log.Printf("executing admin action %v", h.Action) + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + var err error + switch h.Action.GetAction().(type) { + case *executorpb.AdminAction_CreateCloudInstance: + err = executeCreateCloudInstance(ctx, h.Action.GetCreateCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_UpdateCloudInstance: + err = executeUpdateCloudInstance(ctx, h.Action.GetUpdateCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_DeleteCloudInstance: + err = executeDeleteCloudInstance(ctx, h.Action.GetDeleteCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_ListCloudInstances: + err = executeListCloudInstances(ctx, h.Action.GetListCloudInstances(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_ListInstanceConfigs: + err = executeListInstanceConfigs(ctx, h.Action.GetListInstanceConfigs(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_GetCloudInstanceConfig: + err = executeGetCloudInstanceConfig(ctx, h.Action.GetGetCloudInstanceConfig(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_GetCloudInstance: + err = executeGetCloudInstance(ctx, h.Action.GetGetCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_CreateUserInstanceConfig: + err = executeCreateUserInstanceConfig(ctx, h.Action.GetCreateUserInstanceConfig(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_DeleteUserInstanceConfig: + err = executeDeleteUserInstanceConfig(ctx, h.Action.GetDeleteUserInstanceConfig(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_CreateCloudDatabase: + err = executeCreateCloudDatabase(ctx, h.Action.GetCreateCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_UpdateCloudDatabaseDdl: + err = executeUpdateCloudDatabaseDdl(ctx, h.Action.GetUpdateCloudDatabaseDdl(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_DropCloudDatabase: + err = executeDropCloudDatabase(ctx, h.Action.GetDropCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_CreateCloudBackup: + err = executeCreateCloudBackup(ctx, h.Action.GetCreateCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_CopyCloudBackup: + err = executeCopyCloudBackup(ctx, h.Action.GetCopyCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_GetCloudBackup: + err = executeGetCloudBackup(ctx, h.Action.GetGetCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_UpdateCloudBackup: + err = executeUpdateCloudBackup(ctx, h.Action.GetUpdateCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_DeleteCloudBackup: + err = executeDeleteCloudBackup(ctx, h.Action.GetDeleteCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_ListCloudBackups: + err = executeListCloudBackups(ctx, h.Action.GetListCloudBackups(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_ListCloudBackupOperations: + err = executeListCloudBackupOperations(ctx, h.Action.GetListCloudBackupOperations(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_ListCloudDatabases: + err = executeListCloudDatabases(ctx, h.Action.GetListCloudDatabases(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_ListCloudDatabaseOperations: + err = executeListCloudDatabaseOperations(ctx, h.Action.GetListCloudDatabaseOperations(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_RestoreCloudDatabase: + err = executeRestoreCloudDatabase(ctx, h.Action.GetRestoreCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_GetCloudDatabase: + err = executeGetCloudDatabase(ctx, h.Action.GetGetCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_GetOperation: + err = executeGetOperation(ctx, h.Action.GetGetOperation(), h.FlowContext, h.Options, h.OutcomeSender) + case *executorpb.AdminAction_CancelOperation: + err = executeCancelOperation(ctx, h.Action.GetCancelOperation(), h.FlowContext, h.Options, h.OutcomeSender) + default: + err = spanner.ToSpannerError(status.Error(codes.Unimplemented, fmt.Sprintf("Not implemented yet: %v", h.Action))) + } + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + return nil +} + +// execute action that creates a cloud instance. +func executeCreateCloudInstance(ctx context.Context, action *executorpb.CreateCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("creating instance: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + op, err := instanceAdminClient.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ + Parent: fmt.Sprintf("projects/%s", action.GetProjectId()), + InstanceId: instanceID, + Instance: &instancepb.Instance{ + Config: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, action.GetInstanceConfigId()), + DisplayName: instanceID, + NodeCount: action.GetNodeCount(), + ProcessingUnits: action.GetProcessingUnits(), + Labels: action.GetLabels(), + }, + }) + if err != nil { + return err + } + // Wait for the instance creation to finish. + _, err = op.Wait(ctx) + if err != nil { + if status.Code(err) == codes.AlreadyExists { + return nil + } + return err + } + return o.FinishSuccessfully() +} + +// execute action that updates a cloud instance. +func executeUpdateCloudInstance(ctx context.Context, action *executorpb.UpdateCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("updating instance: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + instanceObj := &instancepb.Instance{Name: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID)} + var fieldsToUpdate []string + if action.DisplayName != nil { + fieldsToUpdate = append(fieldsToUpdate, "display_name") + instanceObj.DisplayName = instanceID + } + if action.NodeCount != nil { + fieldsToUpdate = append(fieldsToUpdate, "node_count") + instanceObj.NodeCount = action.GetNodeCount() + } + if action.ProcessingUnits != nil { + fieldsToUpdate = append(fieldsToUpdate, "processing_units") + instanceObj.ProcessingUnits = action.GetProcessingUnits() + } + if action.Labels != nil { + fieldsToUpdate = append(fieldsToUpdate, "labels") + instanceObj.Labels = action.GetLabels() + } + + op, err := instanceAdminClient.UpdateInstance(ctx, &instancepb.UpdateInstanceRequest{ + Instance: instanceObj, + FieldMask: &fieldmaskpb.FieldMask{ + Paths: fieldsToUpdate, + }, + }) + if err != nil { + return err + } + // Wait for the instance update to finish. + _, err = op.Wait(ctx) + if err != nil { + return err + } + + return o.FinishSuccessfully() +} + +// execute action that deletes a cloud instance. +func executeDeleteCloudInstance(ctx context.Context, action *executorpb.DeleteCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("deleting instance: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + err = instanceAdminClient.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{ + Name: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + }) + if err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that lists cloud instances. +func executeListCloudInstances(ctx context.Context, action *executorpb.ListCloudInstancesAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("listing instance: %v", action) + projectID := action.GetProjectId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + listInstancesRequest := &instancepb.ListInstancesRequest{ + Parent: fmt.Sprintf("projects/%s", projectID), + } + if action.PageSize != nil { + listInstancesRequest.PageSize = action.GetPageSize() + } + if action.Filter != nil { + listInstancesRequest.Filter = action.GetFilter() + } + if action.PageToken != nil { + listInstancesRequest.PageToken = action.GetPageToken() + } + iter := instanceAdminClient.ListInstances(ctx, listInstancesRequest) + var instances []*instancepb.Instance + for { + instanceObj, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + instances = append(instances, instanceObj) + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + InstanceResponse: &executorpb.CloudInstanceResponse{ + ListedInstances: instances, + }, + }, + } + err = o.SendOutcome(spannerActionOutcome) + if err != nil { + return err + } + return nil +} + +// execute action that lists cloud instance configs. +func executeListInstanceConfigs(ctx context.Context, action *executorpb.ListCloudInstanceConfigsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("listing instance configs: %v", action) + projectID := action.GetProjectId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + listInstanceConfigsRequest := &instancepb.ListInstanceConfigsRequest{ + Parent: fmt.Sprintf("projects/%s", projectID), + } + if action.PageSize != nil { + listInstanceConfigsRequest.PageSize = action.GetPageSize() + } + if action.PageToken != nil { + listInstanceConfigsRequest.PageToken = action.GetPageToken() + } + iter := instanceAdminClient.ListInstanceConfigs(ctx, listInstanceConfigsRequest) + var instanceConfigs []*instancepb.InstanceConfig + for { + instanceConfig, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + instanceConfigs = append(instanceConfigs, instanceConfig) + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + InstanceConfigResponse: &executorpb.CloudInstanceConfigResponse{ + ListedInstanceConfigs: instanceConfigs, + }, + }, + } + err = o.SendOutcome(spannerActionOutcome) + if err != nil { + return err + } + return nil +} + +// execute action that gets a cloud instance config. +func executeGetCloudInstanceConfig(ctx context.Context, action *executorpb.GetCloudInstanceConfigAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("getting instance config: %v", action) + projectID := action.GetProjectId() + instanceConfigID := action.GetInstanceConfigId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + instanceConfig, err := instanceAdminClient.GetInstanceConfig(ctx, &instancepb.GetInstanceConfigRequest{ + Name: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, instanceConfigID), + }) + if err != nil { + return err + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + InstanceConfigResponse: &executorpb.CloudInstanceConfigResponse{ + InstanceConfig: instanceConfig, + }, + }, + } + err = o.SendOutcome(spannerActionOutcome) + if err != nil { + return err + } + return nil +} + +// execute action that retrieves a cloud instance. +func executeGetCloudInstance(ctx context.Context, action *executorpb.GetCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("retrieving instance: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + defer instanceAdminClient.Close() + instanceObj, err := instanceAdminClient.GetInstance(ctx, &instancepb.GetInstanceRequest{ + Name: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + }) + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + InstanceResponse: &executorpb.CloudInstanceResponse{ + Instance: instanceObj, + }, + }, + } + err = o.SendOutcome(spannerActionOutcome) + if err != nil { + return err + } + return nil +} + +// execute action that creates a user instance config. +func executeCreateUserInstanceConfig(ctx context.Context, action *executorpb.CreateUserInstanceConfigAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("Creating user instance config: %v", action) + projectID := action.GetProjectId() + baseConfigID := action.GetBaseConfigId() + userConfigID := action.GetUserConfigId() + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + op, err := instanceAdminClient.CreateInstanceConfig(ctx, &instancepb.CreateInstanceConfigRequest{ + Parent: fmt.Sprintf("projects/%s", projectID), + InstanceConfigId: userConfigID, + InstanceConfig: &instancepb.InstanceConfig{ + Name: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, userConfigID), + DisplayName: userConfigID, + Replicas: action.GetReplicas(), + BaseConfig: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, baseConfigID), + }, + }) + if err != nil { + return err + } + _, err = op.Wait(ctx) + if err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that deletes a user instance config. +func executeDeleteUserInstanceConfig(ctx context.Context, action *executorpb.DeleteUserInstanceConfigAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("deleting user instance config: %v", action) + instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...) + if err != nil { + return err + } + err = instanceAdminClient.DeleteInstanceConfig(ctx, &instancepb.DeleteInstanceConfigRequest{ + Name: fmt.Sprintf("projects/%s/instanceConfigs/%s", action.GetProjectId(), action.GetUserConfigId()), + }) + if err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that creates a cloud database or cloud custom encrypted database. +func executeCreateCloudDatabase(ctx context.Context, action *executorpb.CreateCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("creating database: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseID := action.GetDatabaseId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + createDatabaseRequest := &adminpb.CreateDatabaseRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + CreateStatement: "CREATE DATABASE `" + databaseID + "`", + ExtraStatements: action.GetSdlStatement(), + } + if action.GetEncryptionConfig() != nil { + createDatabaseRequest.EncryptionConfig = action.GetEncryptionConfig() + } + op, err := databaseAdminClient.CreateDatabase(ctx, createDatabaseRequest) + if err != nil { + return err + } + if _, err := op.Wait(ctx); err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that updates a cloud database. +func executeUpdateCloudDatabaseDdl(ctx context.Context, action *executorpb.UpdateCloudDatabaseDdlAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("updating database ddl %v", action) + dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", action.GetProjectId(), action.GetInstanceId(), action.GetDatabaseId()) + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + defer databaseAdminClient.Close() + op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{ + Database: dbPath, + Statements: action.GetSdlStatement(), + OperationId: action.GetOperationId(), + }) + if err != nil { + return fmt.Errorf("UpdateDatabaseDdl: %w", err) + } + if err := op.Wait(ctx); err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that drops a cloud database. +func executeDropCloudDatabase(ctx context.Context, action *executorpb.DropCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("dropping database: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseID := action.GetDatabaseId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + err = databaseAdminClient.DropDatabase(ctx, &adminpb.DropDatabaseRequest{ + Database: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID), + }) + if err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that creates a cloud database backup. +func executeCreateCloudBackup(ctx context.Context, action *executorpb.CreateCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("creating backup: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseID := action.GetDatabaseId() + backupID := action.GetBackupId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + op, err := databaseAdminClient.CreateBackup(ctx, &adminpb.CreateBackupRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + BackupId: backupID, + Backup: &adminpb.Backup{ + Database: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID), + VersionTime: action.GetVersionTime(), + ExpireTime: action.GetExpireTime(), + }, + }) + if err != nil { + return err + } + backup, err := op.Wait(ctx) + if err != nil { + return err + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + BackupResponse: &executorpb.CloudBackupResponse{ + Backup: backup, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that copies a cloud database backup. +func executeCopyCloudBackup(ctx context.Context, action *executorpb.CopyCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("copying backup: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + backupID := action.GetBackupId() + sourceBackupID := action.GetSourceBackup() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + op, err := databaseAdminClient.CopyBackup(ctx, &adminpb.CopyBackupRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + BackupId: backupID, + SourceBackup: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, sourceBackupID), + ExpireTime: action.GetExpireTime(), + }) + if err != nil { + return err + } + backup, err := op.Wait(ctx) + if err != nil { + return err + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + BackupResponse: &executorpb.CloudBackupResponse{ + Backup: backup, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that gets a cloud database backup. +func executeGetCloudBackup(ctx context.Context, action *executorpb.GetCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("getting backup: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + backupID := action.GetBackupId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + backup, err := databaseAdminClient.GetBackup(ctx, &adminpb.GetBackupRequest{ + Name: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, backupID), + }) + if err != nil { + return err + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + BackupResponse: &executorpb.CloudBackupResponse{ + Backup: backup, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that updates a cloud database backup. +func executeUpdateCloudBackup(ctx context.Context, action *executorpb.UpdateCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("updating backup: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + backupID := action.GetBackupId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + backup, err := databaseAdminClient.UpdateBackup(ctx, &adminpb.UpdateBackupRequest{ + Backup: &adminpb.Backup{ + ExpireTime: action.GetExpireTime(), + Name: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, backupID), + }, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{"expire_time"}, + }, + }) + if err != nil { + return err + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + BackupResponse: &executorpb.CloudBackupResponse{ + Backup: backup, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that deletes a cloud database backup. +func executeDeleteCloudBackup(ctx context.Context, action *executorpb.DeleteCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("deleting backup: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + backupID := action.GetBackupId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + err = databaseAdminClient.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{ + Name: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, backupID), + }) + if err != nil { + return err + } + return o.FinishSuccessfully() +} + +// execute action that lists cloud database backups. +func executeListCloudBackups(ctx context.Context, action *executorpb.ListCloudBackupsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("listing backup: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + iter := databaseAdminClient.ListBackups(ctx, &adminpb.ListBackupsRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + Filter: action.GetFilter(), + PageSize: action.GetPageSize(), + PageToken: action.GetPageToken(), + }) + if err != nil { + return err + } + var backupList []*adminpb.Backup + for { + backup, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + backupList = append(backupList, backup) + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + BackupResponse: &executorpb.CloudBackupResponse{ + ListedBackups: backupList, + NextPageToken: "", + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that lists cloud database backup operations. +func executeListCloudBackupOperations(ctx context.Context, action *executorpb.ListCloudBackupOperationsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("listing backup operation: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + iter := databaseAdminClient.ListBackupOperations(ctx, &adminpb.ListBackupOperationsRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + Filter: action.GetFilter(), + PageSize: action.GetPageSize(), + PageToken: action.GetPageToken(), + }) + if err != nil { + return err + } + var lro []*longrunningpb.Operation + for { + operation, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + lro = append(lro, operation) + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + BackupResponse: &executorpb.CloudBackupResponse{ + ListedBackupOperations: lro, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that list cloud databases. +func executeListCloudDatabases(ctx context.Context, action *executorpb.ListCloudDatabasesAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("listing database: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + iter := databaseAdminClient.ListDatabases(ctx, &adminpb.ListDatabasesRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + PageSize: action.GetPageSize(), + PageToken: action.GetPageToken(), + }) + if err != nil { + return err + } + var databaseList []*adminpb.Database + for { + databaseObj, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + databaseList = append(databaseList, databaseObj) + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + DatabaseResponse: &executorpb.CloudDatabaseResponse{ + ListedDatabases: databaseList, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that lists cloud database operations. +func executeListCloudDatabaseOperations(ctx context.Context, action *executorpb.ListCloudDatabaseOperationsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("listing database operation: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + iter := databaseAdminClient.ListDatabaseOperations(ctx, &adminpb.ListDatabaseOperationsRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID), + PageSize: action.GetPageSize(), + Filter: action.GetFilter(), + PageToken: action.GetPageToken(), + }) + if err != nil { + return err + } + var lro []*longrunningpb.Operation + for { + operation, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + lro = append(lro, operation) + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + DatabaseResponse: &executorpb.CloudDatabaseResponse{ + ListedDatabaseOperations: lro, + NextPageToken: "", + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that restores a cloud database. +func executeRestoreCloudDatabase(ctx context.Context, action *executorpb.RestoreCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("restoring database: %v", action) + projectID := action.GetProjectId() + databaseInstanceID := action.GetDatabaseInstanceId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + restoreOp, err := databaseAdminClient.RestoreDatabase(ctx, &adminpb.RestoreDatabaseRequest{ + Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, databaseInstanceID), + DatabaseId: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, databaseInstanceID, action.GetDatabaseId()), + Source: &adminpb.RestoreDatabaseRequest_Backup{ + Backup: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, action.GetBackupInstanceId(), action.GetBackupId()), + }, + EncryptionConfig: nil, + }) + if err != nil { + return err + } + databaseObject, err := restoreOp.Wait(ctx) + if err != nil { + return err + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + DatabaseResponse: &executorpb.CloudDatabaseResponse{ + Database: databaseObject, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that gets a cloud database. +func executeGetCloudDatabase(ctx context.Context, action *executorpb.GetCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("getting database: %v", action) + projectID := action.GetProjectId() + instanceID := action.GetInstanceId() + databaseID := action.GetDatabaseId() + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + db, err := databaseAdminClient.GetDatabase(ctx, &adminpb.GetDatabaseRequest{ + Name: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID), + }) + if err != nil { + return err + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + DatabaseResponse: &executorpb.CloudDatabaseResponse{ + Database: db, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that gets an operation. +func executeGetOperation(ctx context.Context, action *executorpb.GetOperationAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("getting operation: %v", action) + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + operationResult, err := databaseAdminClient.GetOperation(ctx, &longrunningpb.GetOperationRequest{ + Name: action.GetOperation(), + }) + if err != nil { + return err + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + AdminResult: &executorpb.AdminResult{ + OperationResponse: &executorpb.OperationResponse{ + Operation: operationResult, + }, + }, + } + return o.SendOutcome(spannerActionOutcome) +} + +// execute action that cancels an operation. +func executeCancelOperation(ctx context.Context, action *executorpb.CancelOperationAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error { + log.Printf("cancelling operation: %v", action) + databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...) + if err != nil { + return err + } + err = databaseAdminClient.LROClient.CancelOperation(ctx, &longrunningpb.CancelOperationRequest{ + Name: action.GetOperation(), + }) + if err != nil { + return err + } + return o.FinishSuccessfully() +} diff --git a/spanner/test/cloudexecutor/executor/actions/batch.go b/spanner/test/cloudexecutor/executor/actions/batch.go new file mode 100644 index 000000000000..e85ece013c8a --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/batch.go @@ -0,0 +1,159 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "log" + "time" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/api/option" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// StartBatchTxnHandler holds the necessary components and options required for start batch transaction action. +type StartBatchTxnHandler struct { + Action *executorpb.StartBatchTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender + Options []option.ClientOption +} + +// ExecuteAction that starts a batch transaction +func (h *StartBatchTxnHandler) ExecuteAction(ctx context.Context) error { + log.Printf("starting batch transaction %v", h.Action) + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + if h.FlowContext.isTransactionActiveLocked() { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "already in a transaction"))) + } + + if h.FlowContext.Database == "" { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "database path must be set for this action"))) + } + + client, err := spanner.NewClient(ctx, h.FlowContext.Database, h.Options...) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + var txn *spanner.BatchReadOnlyTransaction + if h.Action.GetBatchTxnTime() != nil { + timestamp := time.Unix(h.Action.GetBatchTxnTime().Seconds, int64(h.Action.GetBatchTxnTime().Nanos)) + txn, err = client.BatchReadOnlyTransaction(ctx, spanner.ReadTimestamp(timestamp)) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + } else if h.Action.GetTid() != nil { + batchTransactionID := spanner.BatchReadOnlyTransactionID{} + err = batchTransactionID.UnmarshalBinary(h.Action.GetTid()) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + txn = client.BatchReadOnlyTransactionFromID(batchTransactionID) + } else { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, "Either timestamp or tid must be set"))) + } + + h.FlowContext.batchTxn = txn + h.FlowContext.currentActiveTransaction = Batch + h.FlowContext.initReadState() + batchTxnIDMarshal, err := txn.ID.MarshalBinary() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + BatchTxnId: batchTxnIDMarshal, + } + return h.OutcomeSender.SendOutcome(spannerActionOutcome) +} + +// BatchDmlHandler holds the necessary components required for BatchDmlAction. +type BatchDmlHandler struct { + Action *executorpb.BatchDmlAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction that execute a BatchDml update action request, store the results in the OutcomeSender +func (h *BatchDmlHandler) ExecuteAction(ctx context.Context) error { + log.Printf("executing BatchDml update %v", h.Action) + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + + var queries []spanner.Statement + for i, update := range h.Action.GetUpdates() { + log.Printf("executing BatchDml update [%d] %s\n %s\n", i+1, h.FlowContext.transactionSeed, update) + stmt, err := utility.BuildQuery(update) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + queries = append(queries, stmt) + } + + rowCounts, err := executeBatchDml(ctx, queries, h.FlowContext) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + h.OutcomeSender.InitForQuery() + for _, rowCount := range rowCounts { + h.OutcomeSender.AppendDmlRowsModified(rowCount) + } + // The batchDml request failed. By design, `rowCounts` contains rows + // modified for DML queries that succeeded only. Add 0 as the row count + // for the last executed DML in the batch (that failed). + if len(rowCounts) != len(queries) { + h.OutcomeSender.AppendDmlRowsModified(0) + } + return h.OutcomeSender.FinishSuccessfully() +} + +// Execute a batch of updates in a read-write transaction +func executeBatchDml(ctx context.Context, stmts []spanner.Statement, flowContext *ExecutionFlowContext) ([]int64, error) { + for i, stmt := range stmts { + log.Printf("executeBatchDml [%d]: %v", i+1, stmt) + } + txn, err := flowContext.getTransactionForWrite() + if err != nil { + return nil, err + } + + return txn.BatchUpdate(ctx, stmts) +} + +// CloseBatchTxnHandler holds the necessary components required for closing batch transaction. +type CloseBatchTxnHandler struct { + Action *executorpb.CloseBatchTransactionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction that finishes a batch transaction +func (h *CloseBatchTxnHandler) ExecuteAction(ctx context.Context) error { + log.Printf("closing batch transaction %v", h.Action) + if h.Action.GetCleanup() { + if h.FlowContext.batchTxn == nil { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "not in a batch transaction")) + } + h.FlowContext.batchTxn.Close() + } + return h.OutcomeSender.FinishSuccessfully() +} diff --git a/spanner/test/cloudexecutor/executor/actions/dql.go b/spanner/test/cloudexecutor/executor/actions/dql.go new file mode 100644 index 000000000000..165a636245ee --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/dql.go @@ -0,0 +1,39 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" +) + +func processResults(iter *spanner.RowIterator, limit int64, outcomeSender *outputstream.OutcomeSender, flowContext *ExecutionFlowContext) error { + return nil +} + +// extractTypes extracts types from given table and columns, while ignoring the child rows. +func extractTypes(table string, cols []string, metadata *utility.TableMetadataHelper) ([]*spannerpb.Type, error) { + var typeList []*spannerpb.Type + for _, col := range cols { + ctype, err := metadata.GetColumnType(table, col) + if err != nil { + return nil, err + } + typeList = append(typeList, ctype) + } + return typeList, nil +} diff --git a/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go new file mode 100644 index 000000000000..99c5e2175203 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/execution_flow_context.go @@ -0,0 +1,124 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "log" + "sync" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type currentActiveTransaction int + +// Current transaction status +const ( + None currentActiveTransaction = iota + Read + ReadWrite + Batch +) + +// ExecutionFlowContext represents a context in which SpannerActions are executed. Among other +// things, it includes currently active transactions and table metadata. There is exactly one +// instance of this per stubby call, created when the call is initialized and shared with all +// actionHandlers. +type ExecutionFlowContext struct { + mu sync.Mutex // protects all internal state + Database string // current database path + rwTxn *spanner.ReadWriteStmtBasedTransaction // Current read-write transaction + roTxn *spanner.ReadOnlyTransaction // Current read-only transaction + batchTxn *spanner.BatchReadOnlyTransaction // Current batch read-only transaction + dbClient *spanner.Client // Current database client + tableMetadata *utility.TableMetadataHelper // If in a txn (except batch), this has metadata info about table columns + numPendingReads int64 // Number of pending read/query actions. + readAborted bool // Indicate whether there's a read/query action got aborted and the transaction need to be reset. + transactionSeed string // Log the workid and op pair for tracing the thread. + currentActiveTransaction currentActiveTransaction + TxnContext context.Context +} + +// isTransactionActiveLocked returns true if any kind of transaction is currently active. Must hold c.mu +// when calling. +func (c *ExecutionFlowContext) isTransactionActiveLocked() bool { + return c.rwTxn != nil || c.roTxn != nil || c.batchTxn != nil +} + +// IsTransactionActive returns true if any kind of transaction is currently active. +func (c *ExecutionFlowContext) IsTransactionActive() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.isTransactionActiveLocked() +} + +// Return current database. Must hold c.mu when calling. +func (c *ExecutionFlowContext) getDatabase() (string, error) { + if c.Database == "" { + return "", status.Error(codes.InvalidArgument, "database doesn't exist") + } + return c.Database, nil +} + +// Return current transaction that can be used for performing read/query actions. Must hold c.mu when calling. +func (c *ExecutionFlowContext) getTransactionForRead() (*spanner.ReadOnlyTransaction, error) { + if c.roTxn != nil { + return c.roTxn, nil + } + return nil, status.Error(codes.InvalidArgument, "no currently active transaction for read") +} + +// Return current transaction that can be used for performing mutation/update actions. Must hold c.mu when calling. +func (c *ExecutionFlowContext) getTransactionForWrite() (*spanner.ReadWriteStmtBasedTransaction, error) { + if c.rwTxn != nil { + return c.rwTxn, nil + } + return nil, status.Error(codes.InvalidArgument, "no currently active transaction for read-write") +} + +// Return current transaction that can be used for performing batch actions. Must hold c.mu when calling. +func (c *ExecutionFlowContext) getBatchTransaction() (*spanner.BatchReadOnlyTransaction, error) { + if c.batchTxn != nil { + return c.batchTxn, nil + } + return nil, status.Error(codes.InvalidArgument, "no currently active batch transaction") +} + +// Increase the read count when a read/query is issued. +func (c *ExecutionFlowContext) startRead() { + c.numPendingReads++ +} + +// Decrease the read count when a read/query is finished, if status is aborted and there's no +// pending read/query, reset the transaction for retry. +func (c *ExecutionFlowContext) finishRead(code codes.Code) { + if code == codes.Aborted { + c.readAborted = true + } + c.numPendingReads-- + if c.readAborted && c.numPendingReads <= 0 { + log.Println("Transaction reset due to read/query abort") + c.readAborted = false + } +} + +// Initialize the read count and aborted status when transaction started. +func (c *ExecutionFlowContext) initReadState() { + c.readAborted = false + c.numPendingReads = 0 +} diff --git a/spanner/test/cloudexecutor/executor/actions/partition.go b/spanner/test/cloudexecutor/executor/actions/partition.go new file mode 100644 index 000000000000..74fee6b5be6b --- /dev/null +++ b/spanner/test/cloudexecutor/executor/actions/partition.go @@ -0,0 +1,228 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "fmt" + "log" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// PartitionReadActionHandler holds the necessary components required for performing partition read action. +type PartitionReadActionHandler struct { + Action *executorpb.GenerateDbPartitionsForReadAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes action that generates database partitions for the given read. +func (h *PartitionReadActionHandler) ExecuteAction(ctx context.Context) error { + metadata := &utility.TableMetadataHelper{} + metadata.InitFromTableMetadata(h.Action.GetTable()) + + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + + h.FlowContext.tableMetadata = metadata + readAction := h.Action.GetRead() + var err error + + var typeList []*spannerpb.Type + if readAction.Index != nil { + typeList, err = extractTypes(readAction.GetTable(), readAction.GetColumn(), h.FlowContext.tableMetadata) + } else { + typeList, err = h.FlowContext.tableMetadata.GetKeyColumnTypes(readAction.GetTable()) + } + if err != nil { + return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err))) + } + + keySet, err := utility.KeySetProtoToCloudKeySet(readAction.GetKeys(), typeList) + if err != nil { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't convert rowSet: %s", err)))) + } + + batchTxn, err := h.FlowContext.getBatchTransaction() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + + partitionOptions := spanner.PartitionOptions{PartitionBytes: h.Action.GetDesiredBytesPerPartition(), MaxPartitions: h.Action.GetMaxPartitionCount()} + var partitions []*spanner.Partition + if readAction.Index != nil { + partitions, err = batchTxn.PartitionReadUsingIndexWithOptions(ctx, readAction.GetTable(), readAction.GetIndex(), keySet, readAction.GetColumn(), partitionOptions, spanner.ReadOptions{}) + } else { + partitions, err = batchTxn.PartitionReadWithOptions(ctx, readAction.GetTable(), keySet, readAction.GetColumn(), partitionOptions, spanner.ReadOptions{}) + } + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + var batchPartitions []*executorpb.BatchPartition + for _, part := range partitions { + partitionInstance, _ := part.MarshalBinary() + batchPartition := &executorpb.BatchPartition{ + Partition: partitionInstance, + PartitionToken: part.GetPartitionToken(), + Table: &readAction.Table, + Index: readAction.Index, + } + batchPartitions = append(batchPartitions, batchPartition) + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + DbPartition: batchPartitions, + } + err = h.OutcomeSender.SendOutcome(spannerActionOutcome) + if err != nil { + log.Printf("GenerateDbPartitionsRead failed for %s", h.Action) + return h.OutcomeSender.FinishWithError(err) + } + return err +} + +// PartitionQueryActionHandler holds the necessary components required for performing partition query action. +type PartitionQueryActionHandler struct { + Action *executorpb.GenerateDbPartitionsForQueryAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes action that generates database partitions for the given query. +func (h *PartitionQueryActionHandler) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + + batchTxn, err := h.FlowContext.getBatchTransaction() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + stmt, err := utility.BuildQuery(h.Action.GetQuery()) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + partitionOptions := spanner.PartitionOptions{PartitionBytes: h.Action.GetDesiredBytesPerPartition()} + partitions, err := batchTxn.PartitionQuery(ctx, stmt, partitionOptions) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + var batchPartitions []*executorpb.BatchPartition + for _, partition := range partitions { + partitionInstance, err := partition.MarshalBinary() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + batchPartition := &executorpb.BatchPartition{ + Partition: partitionInstance, + PartitionToken: partition.GetPartitionToken(), + } + batchPartitions = append(batchPartitions, batchPartition) + } + + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + DbPartition: batchPartitions, + } + err = h.OutcomeSender.SendOutcome(spannerActionOutcome) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + return err +} + +// ExecutePartition holds the necessary components required for executing partition. +type ExecutePartition struct { + Action *executorpb.ExecutePartitionAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes a read or query for the given partitions. +func (h *ExecutePartition) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + + batchTxn, err := h.FlowContext.getBatchTransaction() + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + + partitionBinary := h.Action.GetPartition().GetPartition() + if partitionBinary == nil || len(partitionBinary) == 0 { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Invalid batchPartition %s", h.Action))) + } + if h.Action.GetPartition().Table != nil { + h.OutcomeSender.InitForBatchRead(h.Action.GetPartition().GetTable(), h.Action.GetPartition().Index) + } else { + h.OutcomeSender.InitForQuery() + } + partition := &spanner.Partition{} + if err = partition.UnmarshalBinary(partitionBinary); err != nil { + return h.OutcomeSender.FinishWithError(spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "ExecutePartitionAction: deserializing Partition failed %v", err))) + } + h.FlowContext.startRead() + iter := batchTxn.Execute(ctx, partition) + defer iter.Stop() + err = processResults(iter, 0, h.OutcomeSender, h.FlowContext) + if err != nil { + h.FlowContext.finishRead(status.Code(err)) + if status.Code(err) == codes.Aborted { + return h.OutcomeSender.FinishWithTransactionRestarted() + } + return h.OutcomeSender.FinishWithError(err) + } + h.FlowContext.finishRead(codes.OK) + return h.OutcomeSender.FinishSuccessfully() +} + +// PartitionedUpdate holds the necessary components required for performing partitioned update. +type PartitionedUpdate struct { + Action *executorpb.PartitionedUpdateAction + FlowContext *ExecutionFlowContext + OutcomeSender *outputstream.OutcomeSender +} + +// ExecuteAction executes a partitioned update which runs different partitions in parallel. +func (h *PartitionedUpdate) ExecuteAction(ctx context.Context) error { + h.FlowContext.mu.Lock() + defer h.FlowContext.mu.Unlock() + + opts := h.Action.GetOptions() + stmt := spanner.Statement{SQL: h.Action.GetUpdate().GetSql()} + count, err := h.FlowContext.dbClient.PartitionedUpdateWithOptions(h.FlowContext.TxnContext, stmt, spanner.QueryOptions{ + Priority: opts.GetRpcPriority(), + RequestTag: opts.GetTag(), + }) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + spannerActionOutcome := &executorpb.SpannerActionOutcome{ + Status: &spb.Status{Code: int32(codes.OK)}, + DmlRowsModified: []int64{count}, + } + err = h.OutcomeSender.SendOutcome(spannerActionOutcome) + if err != nil { + return h.OutcomeSender.FinishWithError(err) + } + return err +} diff --git a/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go b/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go new file mode 100644 index 000000000000..6e7748d63988 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/utility/executor_to_spanner_conversion.go @@ -0,0 +1,31 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" +) + +// BuildQuery constructs a spanner.Statement query and bind the params from the input executor query. +func BuildQuery(queryAction *executorpb.QueryAction) (spanner.Statement, error) { + return spanner.Statement{}, nil +} + +// KeySetProtoToCloudKeySet converts an executor KeySet to a Cloud Spanner KeySet instance. +func KeySetProtoToCloudKeySet(keySetProto *executorpb.KeySet, typeList []*spannerpb.Type) (spanner.KeySet, error) { + return spanner.AllKeys(), nil +}