Skip to content

Commit

Permalink
[GoSDK + Prism] Support Process env execution. (#33651)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Jan 18, 2025
1 parent 7d17f2a commit 25be5af
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@
* External, Process based Worker Pool support added to the Go SDK container. ([#33572](https://github.com/apache/beam/pull/33572))
* This is used to enable sidecar containers to run SDK workers for some runners.
* See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for details.
* Support the Process Environment for execution in the Go SDK. ([#33651](https://github.com/apache/beam/pull/33651))
* Prism
* Prism now uses the same single port for both pipeline submission and execution on workers. Requests are differentiated by worker-id. ([#33438](https://github.com/apache/beam/pull/33438))
* This avoids port starvation and provides clarity on port use when running Prism in non-local environments.
* Support for @RequiresTimeSortedInputs added. ([#33513](https://github.com/apache/beam/issues/33513))
* Initial support for AllowedLateness added. ([#33542](https://github.com/apache/beam/pull/33542))
* The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now supports non-loopback mode environment types. ([#33572](https://github.com/apache/beam/pull/33572))
* Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651))

## Breaking Changes

Expand Down
9 changes: 7 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package graphx

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -122,8 +123,12 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
var serializedPayload []byte
switch urn {
case URNEnvProcess:
// TODO Support process based SDK Harness.
return nil, errors.Errorf("unsupported environment %v", urn)
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ProcessPayload{}
if err := json.Unmarshal([]byte(config), payload); err != nil {
return nil, fmt.Errorf("unable to json unmarshal --environment_config: %w", err)
}
serializedPayload = protox.MustEncode(payload)
case URNEnvExternal:
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}
Expand Down
31 changes: 22 additions & 9 deletions sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,22 +296,22 @@ func (fn *splitPickFn) ProcessElement(_ *testRT, a int, small, big func(int)) {
}

func TestCreateEnvironment(t *testing.T) {
t.Run("process", func(t *testing.T) {
const wantEnv = "process"
t.Run("processBadConfig", func(t *testing.T) {
urn := graphx.URNEnvProcess
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return wantEnv })
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return "not a real json" })
if err == nil {
t.Errorf("CreateEnvironment(%v) = %v error, want error since it's unsupported", urn, err)
t.Errorf("CreateEnvironment(%v) = %v error, want error since parsing should fail", urn, err)
}
want := (*pipepb.Environment)(nil)
if !proto.Equal(got, want) {
t.Errorf("CreateEnvironment(%v) = %v, want %v since it's unsupported", urn, got, want)
t.Errorf("CreateEnvironment(%v) = %v, want %v since creation should have failed", urn, got, want)
}
})
tests := []struct {
name string
urn string
payload func(name string) []byte
name string
configOverride string
urn string
payload func(name string) []byte
}{
{
name: "external",
Expand All @@ -331,12 +331,25 @@ func TestCreateEnvironment(t *testing.T) {
ContainerImage: name,
})
},
}, {
name: "process",
configOverride: "{ \"command\": \"process\" }",
urn: graphx.URNEnvProcess,
payload: func(name string) []byte {
return protox.MustEncode(&pipepb.ProcessPayload{
Command: name,
})
},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return test.name })
config := test.name
if test.configOverride != "" {
config = test.configOverride
}
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return config })
if err != nil {
t.Errorf("CreateEnvironment(%v) = %v error, want nil", test.urn, err)
}
Expand Down
31 changes: 31 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"io"
"log/slog"
"os"
"os/exec"
"time"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
Expand Down Expand Up @@ -66,6 +68,16 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
logger.Error("unmarshing docker environment payload", "error", err)
}
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
case urns.EnvProcess:
pp := &pipepb.ProcessPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
}
go func() {
processEnvironment(ctx, pp, wk)
logger.Debug("environment stopped", slog.String("job", j.String()))
}()
return nil
default:
return fmt.Errorf("environment %v with urn %v unimplemented", env, e.GetUrn())
}
Expand Down Expand Up @@ -231,3 +243,22 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock

return nil
}

func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *worker.W) {
cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, "--provision_endpoint="+wk.Endpoint())

cmd.WaitDelay = time.Millisecond * 100
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
cmd.Env = os.Environ()

for k, v := range pp.GetEnv() {
cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v))
}
if err := cmd.Start(); err != nil {
return
}
// Job processing happens here, but orchestrated by other goroutines
// This call blocks until the context is cancelled, or the command exits.
cmd.Wait()
}

0 comments on commit 25be5af

Please sign in to comment.