Skip to content

Commit

Permalink
support scheduled lambdas (#106)
Browse files Browse the repository at this point in the history
* support scheduled lambdas

* upgrade go client

* fix merge conflict

* go fmt

* comments
  • Loading branch information
kristielim authored Apr 5, 2024
1 parent 921d6e0 commit d2ed882
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/hashicorp/terraform-plugin-mux v0.15.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.33.0
github.com/hashicorp/terraform-plugin-testing v1.7.0
github.com/rockset/rockset-go-client v0.24.1
github.com/rockset/rockset-go-client v0.24.2
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.9.0
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/cli v20.10.17+incompatible h1:eO2KS7ZFeov5UJeaDmIs1NFEDRf32PaqRpvoEkKBy5M=
github.com/docker/cli v20.10.17+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM=
github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v24.0.9+incompatible h1:HPGzNmwfLZWdxHqK9/II92pyi1EpYKsAqcl4G0Of9v0=
github.com/docker/docker v24.0.9+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
Expand Down Expand Up @@ -206,8 +206,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rockset/rockset-go-client v0.24.1 h1:s8LpCAso5Bqh/3UEJgOYLry615jiQjc0ajInQBL5XoQ=
github.com/rockset/rockset-go-client v0.24.1/go.mod h1:Q5mjXK/azJa+CM/+cFEnZOfxvYw2I7S3fOXLKpJFSGA=
github.com/rockset/rockset-go-client v0.24.2 h1:/7542zPZeKwK+Nafh22kbuE2fyIGKJMLIdZdambD8z8=
github.com/rockset/rockset-go-client v0.24.2/go.mod h1:SuMK352xIAawSB1Ev5/AWx5SLBeDl6E4bVniPVGq1rc=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
Expand Down
1 change: 1 addition & 0 deletions rockset/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Provider() *schema.Provider {
"rockset_view": resourceView(),
"rockset_virtual_instance": resourceVirtualInstance(),
"rockset_workspace": resourceWorkspace(),
"rockset_scheduled_lambda": resourceScheduledLambda(),
},
DataSourcesMap: map[string]*schema.Resource{
"rockset_account": dataSourceRocksetAccount(),
Expand Down
246 changes: 246 additions & 0 deletions rockset/resource_scheduled_lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package rockset

import (
"context"

"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/rockset/rockset-go-client"
"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
)

func resourceScheduledLambda() *schema.Resource {
return &schema.Resource{
Description: "Manages a Rockset scheduled lambda, a query lambda that is automatically executed on a schedule.",

CreateContext: resourceScheduledLambdaCreate,
ReadContext: resourceScheduledLambdaRead,
UpdateContext: resourceScheduledLambdaUpdate,
DeleteContext: resourceScheduledLambdaDelete,

Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},

Schema: map[string]*schema.Schema{
"rrn": {
Description: "RRN of this Scheduled Lambda.",
Type: schema.TypeString,
Computed: true,
},
"workspace": {
Description: "Workspace name.",
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: rocksetNameValidator,
},
"apikey": {
Description: "The apikey to use when triggering execution of the associated query lambda.",
Type: schema.TypeString,
Required: true,
Sensitive: true,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
// apikey is not returned for security reasons
return new == ""
},
},
"cron_string": {
Description: "The UNIX-formatted cron string for this scheduled query lambda.",
Type: schema.TypeString,
ForceNew: true,
Required: true,
// TODO: add a validator
},
"query_lambda_name": {
Description: "The name of the QL to use for scheduled execution.",
Type: schema.TypeString,
ForceNew: true,
Required: true,
},
"tag": {
Description: "The QL tag to use for scheduled execution.",
Type: schema.TypeString,
ForceNew: true,
Optional: true,
},
"version": {
Description: "The version of the QL to use for scheduled execution.",
Type: schema.TypeString,
ForceNew: true,
Optional: true,
},
"total_times_to_execute": {
Description: "The number of times to execute this scheduled query lambda. Once this scheduled query lambda has been executed this many times, it will no longer be executed.",
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validation.IntAtLeast(0),
},
"webhook_auth_header": {
Description: "The value to use as the authorization header when hitting the webhook.",
Type: schema.TypeString,
Optional: true,
Sensitive: true,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
// auth header is not returned for security reasons
return new == ""
},
},
"webhook_payload": {
Description: "The payload that should be sent to the webhook. JSON format.",
Type: schema.TypeString,
Optional: true,
},
"webhook_url": {
Description: "The URL of the webhook that should be triggered after this scheduled query lambda completes.",
Type: schema.TypeString,
Optional: true,
},
},
}
}

func resourceScheduledLambdaCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
rc := meta.(*rockset.RockClient)
var diags diag.Diagnostics

workspace := d.Get("workspace").(string)
apikey := d.Get("apikey").(string)
cronString := d.Get("cron_string").(string)
qlName := d.Get("query_lambda_name").(string)

options := getScheduledLambdaOptions(d)

scheduledLambda, err := rc.CreateScheduledLambda(ctx, workspace, apikey, cronString, qlName, options...)
if err != nil {
return DiagFromErr(err)
}

scheduledLambdaRrn := *scheduledLambda.Rrn
d.SetId(toID(workspace, scheduledLambdaRrn))

err = rc.Wait.UntilScheduledLambdaAvailable(ctx, workspace, scheduledLambdaRrn)
if err != nil {
return DiagFromErr(err)
}

if err = parseScheduledLambdaFields(scheduledLambda, d); err != nil {
return DiagFromErr(err)
}

return diags
}

func resourceScheduledLambdaUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
rc := meta.(*rockset.RockClient)
var diags diag.Diagnostics

workspace, scheduledLambdaRRN := workspaceAndNameFromID(d.Id())

options := getScheduledLambdaOptions(d)
_, err := rc.UpdateScheduledLambda(ctx, workspace, scheduledLambdaRRN, options...)
if err != nil {
return DiagFromErr(err)
}

return diags
}

func resourceScheduledLambdaRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
rc := meta.(*rockset.RockClient)
var diags diag.Diagnostics

workspace, scheduledLambdaRRN := workspaceAndNameFromID(d.Id())

scheduledLambda, err := rc.GetScheduledLambda(ctx, workspace, scheduledLambdaRRN)
if err != nil {
return DiagFromErr(err)
}

if err = parseScheduledLambdaFields(scheduledLambda, d); err != nil {
return DiagFromErr(err)
}

return diags
}

func resourceScheduledLambdaDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
rc := meta.(*rockset.RockClient)
var diags diag.Diagnostics

workspace, scheduledLambdaRRN := workspaceAndNameFromID(d.Id())

err := rc.DeleteScheduledLambda(ctx, workspace, scheduledLambdaRRN)
if err != nil {
return DiagFromErr(err)
}

err = rc.Wait.UntilScheduledLambdaGone(ctx, workspace, scheduledLambdaRRN)
if err != nil {
return DiagFromErr(err)
}

return diags
}

func getScheduledLambdaOptions(d *schema.ResourceData) []option.ScheduledLambdaOption {
var options []option.ScheduledLambdaOption
addOptionIfChanged(d, "tag", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaTag(a.(string))
})
addOptionIfChanged(d, "version", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaVersion(a.(string))
})
addOptionIfChanged(d, "apikey", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaApikey(a.(string))
})
addOptionIfChanged(d, "total_times_to_execute", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaTotalTimesToExecute(int64(a.(int)))
})
addOptionIfChanged(d, "webhook_auth_header", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaWebhookAuthHeader(a.(string))
})
addOptionIfChanged(d, "webhook_payload", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaWebhookPayload(a.(string))
})
addOptionIfChanged(d, "webhook_url", &options, func(a any) option.ScheduledLambdaOption {
return option.WithScheduledLambdaWebhookURL(a.(string))
})

return options
}

func parseScheduledLambdaFields(scheduledLambda openapi.ScheduledLambda, d *schema.ResourceData) error {
if err := setValue(d, "rrn", scheduledLambda.GetRrnOk); err != nil {
return err
}
if err := setValue(d, "workspace", scheduledLambda.GetWorkspaceOk); err != nil {
return err
}
if err := setValue(d, "cron_string", scheduledLambda.GetCronStringOk); err != nil {
return err
}
if err := setValue(d, "query_lambda_name", scheduledLambda.GetQlNameOk); err != nil {
return err
}
if err := setValue(d, "tag", scheduledLambda.GetTagOk); err != nil {
return err
}
if err := setValue(d, "version", scheduledLambda.GetVersionOk); err != nil {
return err
}
if err := setValue(d, "total_times_to_execute", scheduledLambda.GetTotalTimesToExecuteOk); err != nil {
return err
}
if err := setValue(d, "webhook_payload", scheduledLambda.GetWebhookPayloadOk); err != nil {
return err
}
if err := setValue(d, "webhook_url", scheduledLambda.GetWebhookUrlOk); err != nil {
return err
}

return nil
}
91 changes: 91 additions & 0 deletions rockset/resource_scheduled_lambda_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package rockset

import (
"fmt"
"strconv"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
"github.com/rockset/rockset-go-client"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccScheduledLambda_Basic(t *testing.T) {
scheduledLambda := "rockset_scheduled_lambda.test_scheduled_lambda"

type cfg struct {
CronString string
TotalTimesToExecute int64
QueryLambdaName string
}
qlName := randomName("test_query_lambda_name")
s1 := cfg{"0 0 0 ? * * *", 1, qlName}
s2 := cfg{"0 0 0 ? * * *", 2, qlName}
s3 := cfg{"0 0 * ? * * *", 3, qlName}

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
CheckDestroy: testAccCheckRocksetScheduledLambdaDestroy,
Steps: []resource.TestStep{
{
Config: getHCLTemplate("scheduled_lambda_basic.tf", s1),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet(scheduledLambda, "rrn"),
resource.TestCheckResourceAttrSet(scheduledLambda, "apikey"),
resource.TestCheckResourceAttr(scheduledLambda, "workspace", "acc"),
resource.TestCheckResourceAttr(scheduledLambda, "cron_string", s1.CronString),
resource.TestCheckResourceAttr(scheduledLambda, "query_lambda_name", qlName),
resource.TestCheckResourceAttr(scheduledLambda, "tag", "latest"),
resource.TestCheckResourceAttr(scheduledLambda, "total_times_to_execute", strconv.FormatInt(s1.TotalTimesToExecute, 10)),
),
},
{
Config: getHCLTemplate("scheduled_lambda_basic.tf", s2),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet(scheduledLambda, "rrn"),
resource.TestCheckResourceAttrSet(scheduledLambda, "apikey"),
resource.TestCheckResourceAttr(scheduledLambda, "workspace", "acc"),
resource.TestCheckResourceAttr(scheduledLambda, "cron_string", s2.CronString),
resource.TestCheckResourceAttr(scheduledLambda, "query_lambda_name", qlName),
resource.TestCheckResourceAttr(scheduledLambda, "tag", "latest"),
resource.TestCheckResourceAttr(scheduledLambda, "total_times_to_execute", strconv.FormatInt(s2.TotalTimesToExecute, 10)),
),
},
{
Config: getHCLTemplate("scheduled_lambda_basic.tf", s3),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet(scheduledLambda, "rrn"),
resource.TestCheckResourceAttrSet(scheduledLambda, "apikey"),
resource.TestCheckResourceAttr(scheduledLambda, "workspace", "acc"),
resource.TestCheckResourceAttr(scheduledLambda, "cron_string", s3.CronString),
resource.TestCheckResourceAttr(scheduledLambda, "query_lambda_name", qlName),
resource.TestCheckResourceAttr(scheduledLambda, "tag", "latest"),
resource.TestCheckResourceAttr(scheduledLambda, "total_times_to_execute", strconv.FormatInt(s3.TotalTimesToExecute, 10)),
),
},
},
})
}

func testAccCheckRocksetScheduledLambdaDestroy(s *terraform.State) error {
rc := testAccProvider.Meta().(*rockset.RockClient)

for _, rs := range s.RootModule().Resources {
if rs.Type != "rockset_scheduled_lambda" {
continue
}

workspace, scheduledLambdaRRN := workspaceAndNameFromID(rs.Primary.ID)
_, err := rc.GetScheduledLambda(testCtx, workspace, scheduledLambdaRRN)

// An error would mean we didn't find the key, we expect an error
if err == nil {
// We did not get an error, so we failed to delete the key.
return fmt.Errorf("scheduled lambda %s still exists", scheduledLambdaRRN)
}
}

return nil
}
16 changes: 16 additions & 0 deletions testdata/scheduled_lambda_basic.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
resource rockset_query_lambda "test_query_lambda" {
workspace = "acc"
name = "{{ .QueryLambdaName}}"
sql {
query = "select 1"
}
}

resource "rockset_scheduled_lambda" "test_scheduled_lambda" {
workspace = "acc"
apikey = "var.ROCKSET_APIKEY"
cron_string = "{{ .CronString }}"
query_lambda_name = rockset_query_lambda.test_query_lambda.name
tag = "latest"
total_times_to_execute = {{ .TotalTimesToExecute }}
}

0 comments on commit d2ed882

Please sign in to comment.