Skip to content

Commit

Permalink
Update AIS integration
Browse files Browse the repository at this point in the history
Update AIS integration to use Enduro's poststorage child workflows.

- Remove AIS API server.
- Remove `AIPName` and make `AIPUUID` a string in AIS `WorkflowParams`.
- Add local activity to get the AIP current path from AMSS.
- Parse AIP directory name from current path.
  • Loading branch information
jraddaoui committed Nov 13, 2024
1 parent 7dd47ca commit fa37e42
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 203 deletions.
1 change: 0 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ k8s_yaml(kustomize("hack/kube/overlays/dev"))
# Preprocessing resources
k8s_resource(
"preprocessing-worker",
port_forwards="0.0.0.0:9035:9035",
labels=["01-Preprocessing"],
trigger_mode=trigger_mode
)
Expand Down
1 change: 0 additions & 1 deletion Tiltfile.enduro
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ k8s_yaml(kustomize("hack/kube/overlays/enduro"))
# Preprocessing resources
k8s_resource(
"preprocessing-worker",
port_forwards="0.0.0.0:9035:9035",
labels=["Preprocessing"],
trigger_mode=trigger_mode
)
Expand Down
23 changes: 18 additions & 5 deletions cmd/worker/aiscmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,24 @@ type Main struct {
bucket *blob.Bucket
}

func NewMain(logger logr.Logger, cfg ais.Config, tc temporalsdk_client.Client) *Main {
func NewMain(logger logr.Logger, cfg ais.Config) *Main {

Check warning on line 28 in cmd/worker/aiscmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/aiscmd/cmd.go#L28

Added line #L28 was not covered by tests
return &Main{
logger: logger,
cfg: cfg,
temporalClient: tc,
logger: logger,
cfg: cfg,

Check warning on line 31 in cmd/worker/aiscmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/aiscmd/cmd.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
}

func (m *Main) Run(ctx context.Context) error {
tc, err := temporalsdk_client.Dial(temporalsdk_client.Options{
HostPort: m.cfg.Temporal.Address,
Namespace: m.cfg.Temporal.Namespace,
Logger: temporal.Logger(m.logger.WithName("ais-temporal")),
})
if err != nil {
return fmt.Errorf("Unable to create AIS Temporal client: %w", err)
}
m.temporalClient = tc

Check warning on line 45 in cmd/worker/aiscmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/aiscmd/cmd.go#L36-L45

Added lines #L36 - L45 were not covered by tests
w := temporalsdk_worker.New(m.temporalClient, m.cfg.Temporal.TaskQueue, temporalsdk_worker.Options{
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: m.cfg.Worker.MaxConcurrentSessions,
Expand All @@ -54,7 +63,7 @@ func (m *Main) Run(ctx context.Context) error {
}

if err := w.Start(); err != nil {
m.logger.Error(err, "Worker failed to start.")
m.logger.Error(err, "AIS worker failed to start.")

Check warning on line 66 in cmd/worker/aiscmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/aiscmd/cmd.go#L66

Added line #L66 was not covered by tests
return err
}

Expand All @@ -66,6 +75,10 @@ func (m *Main) Close() error {
m.temporalWorker.Stop()
}

if m.temporalClient != nil {
m.temporalClient.Close()
}

Check warning on line 80 in cmd/worker/aiscmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/aiscmd/cmd.go#L78-L80

Added lines #L78 - L80 were not covered by tests

if m.bucket != nil {
return m.bucket.Close()
}
Expand Down
34 changes: 1 addition & 33 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@ import (
"os/signal"
"runtime"
"syscall"
"time"

"github.com/oklog/run"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/log"
"go.artefactual.dev/tools/temporal"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/preprocessing-sfa/cmd/worker/aiscmd"
"github.com/artefactual-sdps/preprocessing-sfa/cmd/worker/workercmd"
"github.com/artefactual-sdps/preprocessing-sfa/internal/ais"
"github.com/artefactual-sdps/preprocessing-sfa/internal/config"
"github.com/artefactual-sdps/preprocessing-sfa/internal/version"
)
Expand Down Expand Up @@ -96,22 +92,10 @@ func main() {
)
}

// AIS Temporal client.
atc, err := temporalsdk_client.Dial(temporalsdk_client.Options{
HostPort: cfg.AIS.Temporal.Address,
Namespace: cfg.AIS.Temporal.Namespace,
Logger: temporal.Logger(logger.WithName("ais-temporal")),
})
if err != nil {
logger.Error(err, "Unable to create AIS Temporal client.")
os.Exit(1)
}
defer atc.Close()

// AIS worker.
{
done := make(chan struct{})
m := aiscmd.NewMain(logger, cfg.AIS, atc)
m := aiscmd.NewMain(logger, cfg.AIS)

Check warning on line 98 in cmd/worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/main.go#L98

Added line #L98 was not covered by tests
g.Add(
func() error {
if err := m.Run(ctx); err != nil {
Expand All @@ -129,22 +113,6 @@ func main() {
)
}

// AIS API server.
{
srv := ais.NewAPIServer(ctx, atc, cfg.AIS)
g.Add(
func() error {
logger.Info("API server running", "listen", cfg.AIS.Listen)
return srv.ListenAndServe()
},
func(err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = srv.Shutdown(ctx)
},
)
}

// Signal handler.
{
var (
Expand Down
2 changes: 1 addition & 1 deletion cmd/worker/workercmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (m *Main) Run(ctx context.Context) error {
)

if err := w.Start(); err != nil {
m.logger.Error(err, "Worker failed to start.")
m.logger.Error(err, "Preprocessing worker failed to start.")

Check warning on line 110 in cmd/worker/workercmd/cmd.go

View check run for this annotation

Codecov / codecov/patch

cmd/worker/workercmd/cmd.go#L110

Added line #L110 was not covered by tests
return err
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.9.0
github.com/tonglil/buflogr v1.1.1
go.artefactual.dev/tools v0.14.0
go.temporal.io/api v1.32.0
go.temporal.io/sdk v1.26.1
gocloud.dev v0.39.0
gotest.tools/v3 v3.5.1
Expand Down Expand Up @@ -80,6 +79,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.temporal.io/api v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect
Expand Down
1 change: 0 additions & 1 deletion hack/kube/overlays/dev/preprocessing-secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ stringData:
checksumAlgorithm = "md5"
[ais]
listen = "0.0.0.0:9035"
workingDir = "/tmp"
[ais.temporal]
Expand Down
1 change: 0 additions & 1 deletion hack/kube/overlays/enduro/preprocessing-secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ stringData:
checksumAlgorithm = "md5"
[ais]
listen = "0.0.0.0:9035"
workingDir = "/tmp"
[ais.temporal]
Expand Down
61 changes: 0 additions & 61 deletions internal/ais/api.go

This file was deleted.

1 change: 0 additions & 1 deletion internal/ais/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ais
import "go.artefactual.dev/tools/bucket"

type Config struct {
Listen string
WorkingDir string
Temporal TemporalConfig
Worker WorkerConfig
Expand Down
5 changes: 2 additions & 3 deletions internal/ais/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"os"
"path/filepath"

"github.com/google/uuid"
"go.artefactual.dev/tools/temporal"
)

const FetchActivityName = "fetch-amss-file"

type (
FetchActivityParams struct {
AIPUUID uuid.UUID
AIPUUID string
RelativePath string
Destination string
}
Expand Down Expand Up @@ -42,7 +41,7 @@ func (a *FetchActivity) Execute(ctx context.Context, params *FetchActivityParams
}
defer file.Close()

err = a.amssclient.DownloadAIPFile(ctx, params.AIPUUID.String(), params.RelativePath, file)
err = a.amssclient.DownloadAIPFile(ctx, params.AIPUUID, params.RelativePath, file)

Check warning on line 44 in internal/ais/fetch.go

View check run for this annotation

Codecov / codecov/patch

internal/ais/fetch.go#L44

Added line #L44 was not covered by tests
if err != nil {
return nil, fmt.Errorf("FetchActivity: download file: %w", err)
}
Expand Down
20 changes: 20 additions & 0 deletions internal/ais/localact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ais

import "context"

type GetAIPPathActivityParams struct {
AMSSClient *AMSSClient
AIPUUID string
}

type GetAIPPathActivityResult struct {
Path string
}

func GetAIPPathActivity(ctx context.Context, params *GetAIPPathActivityParams) (*GetAIPPathActivityResult, error) {
path, err := params.AMSSClient.GetAIPPath(ctx, params.AIPUUID)
if err != nil {
return nil, err
}
return &GetAIPPathActivityResult{Path: path}, nil

Check warning on line 19 in internal/ais/localact.go

View check run for this annotation

Codecov / codecov/patch

internal/ais/localact.go#L14-L19

Added lines #L14 - L19 were not covered by tests
}
54 changes: 54 additions & 0 deletions internal/ais/policies.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package ais

import (
"time"

temporalsdk_temporal "go.temporal.io/sdk/temporal"
temporalsdk_workflow "go.temporal.io/sdk/workflow"
)

// We use this constant to represent a long period of time (10 years).
const forever = time.Hour * 24 * 365 * 10

// withActivityOptsForLongLivedRequest returns a workflow context with activity
// options suited for long-running activities without heartbeats
func withActivityOptsForLongLivedRequest(ctx temporalsdk_workflow.Context) temporalsdk_workflow.Context {
return temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Hour * 2,
RetryPolicy: &temporalsdk_temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute * 10,
MaximumAttempts: 5,
NonRetryableErrorTypes: []string{
"TemporalTimeout:StartToClose",
},
},
})

Check warning on line 27 in internal/ais/policies.go

View check run for this annotation

Codecov / codecov/patch

internal/ais/policies.go#L15-L27

Added lines #L15 - L27 were not covered by tests
}

// withFilesystemActivityOpts returns a workflow context with activity
// options suited for activities like disk operations that should not
// require a retry policy attached.
func withFilesystemActivityOpts(ctx temporalsdk_workflow.Context) temporalsdk_workflow.Context {
return temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Hour,
RetryPolicy: &temporalsdk_temporal.RetryPolicy{
MaximumAttempts: 1,
},
})

Check warning on line 39 in internal/ais/policies.go

View check run for this annotation

Codecov / codecov/patch

internal/ais/policies.go#L33-L39

Added lines #L33 - L39 were not covered by tests
}

// withLocalActivityOpts returns a workflow context with activity options suited
// for local and short-lived activities with a few retries.
func withLocalActivityOpts(ctx temporalsdk_workflow.Context) temporalsdk_workflow.Context {
return temporalsdk_workflow.WithLocalActivityOptions(ctx, temporalsdk_workflow.LocalActivityOptions{
ScheduleToCloseTimeout: 5 * time.Second,
RetryPolicy: &temporalsdk_temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
})

Check warning on line 53 in internal/ais/policies.go

View check run for this annotation

Codecov / codecov/patch

internal/ais/policies.go#L44-L53

Added lines #L44 - L53 were not covered by tests
}
Loading

0 comments on commit fa37e42

Please sign in to comment.