Skip to content

Commit

Permalink
[#24789] Remainder of changes from #27550. (#27822)
Browse files Browse the repository at this point in the history
* Make the prism runner the default Go SDK runner.

* Break cycle with ptest.

* [DO NOT SUBMIT] Most changes needec to set prism as default Go SDK runner.

* rm commented out code.

* Avoid unnecessary logs on normal path.

* Fix top.

* Adjust Go versions?

* [prism] Update symbol lookup to not be unit test specific.

* [prism] Support for reshuffles.

* [prism] move reshuffle test out of unimplemented.

* [prism] Add CoGBK test to unimplemented set.

* [prism] graduate additional tests.

* delint

* [prog] guide updates

* [prism] Support CoGBKs and wafer thin fusion.

* Make window close strict.

* quick first pass

* chang cleanup

* remove unnecessary churn changes

* rm execute line

* Update sdks/go/pkg/beam/runners/vet/vet.go

Co-authored-by: Ritesh Ghorse <[email protected]>

---------

Co-authored-by: lostluck <[email protected]>
Co-authored-by: Ritesh Ghorse <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2023
1 parent 23d44a4 commit 3b112e8
Show file tree
Hide file tree
Showing 19 changed files with 64 additions and 31 deletions.
12 changes: 10 additions & 2 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
return nil, err
}
ch.forceRecreate = func(id string, err error) {
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
switch status.Code(err) {
case codes.Canceled:
// Don't log on context canceled path.
default:
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
Expand Down Expand Up @@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) {
c.terminateStreamOnError(err)
c.mu.Unlock()

if err == io.EOF {
st := status.Code(err)
if st == codes.Canceled || err == io.EOF {
return
}
log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id, err)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format string, args ...any) *fn
// dial to the specified endpoint. if timeout <=0, call blocks until
// grpc.Dial succeeds.
func dial(ctx context.Context, endpoint, purpose string, timeout time.Duration) (*grpc.ClientConn, error) {
log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose)
log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s for %s ...", endpoint, purpose))
return grpcx.Dial(ctx, endpoint, timeout)
}
9 changes: 8 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type writeTypeEnum int32
Expand Down Expand Up @@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
return nil, err
}
ch.forceRecreate = func(id string, err error) {
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
switch status.Code(err) {
case codes.Canceled:
// Don't log on context canceled path.
default:
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any, error) {
type failResolver bool

func (p failResolver) Sym2Addr(name string) (uintptr, error) {
return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name)
}
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func createList(s Scope, values []any, t reflect.Type) (PCollection, error) {

// TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421.

func init() {
register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil))
register.Emitter1[T]()
}

type createFn struct {
Values [][]byte `json:"values"`
Type EncodedType `json:"type"`
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestCreateList(t *testing.T) {
{[]float64{float64(0.1), float64(0.2), float64(0.3)}},
{[]uint{uint(1), uint(2), uint(3)}},
{[]bool{false, true, true, false, true}},
{[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}},
{[]*testProto{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401
{[]wc{{"a", 23}, {"b", 42}, {"c", 5}}},
{[]*testProto{{}, {stringValue("test")}}}, // Test for BEAM-4401
}

for _, test := range tests {
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/io/databaseio/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
_ "github.com/proullon/ramsql/driver"
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/io/datastoreio/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Foo struct {
type Bar struct {
}

func init() {
beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem())
}

func Test_query(t *testing.T) {
testCases := []struct {
v any
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func testFunction() int64 {

func TestFormatParDoError(t *testing.T) {
got := formatParDoError(testFunction, 2, 1)
want := "beam.testFunction has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead."
want := "has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead."
if !strings.Contains(got, want) {
t.Errorf("formatParDoError(testFunction,2,1) = %v, want = %v", got, want)
t.Errorf("formatParDoError(testFunction,2,1) = \n%q want =\n%q", got, want)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,6 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) {
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side]
// These panics indicate pre-process/stage construction problems.
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
type RunnerCharacteristic struct {
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
SDKReshuffle bool
SDKReshuffle bool // Sets whether we should use the SDK backup implementation to handle a Reshuffle.
}

func Runner(config any) *runner {
Expand Down
8 changes: 6 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type link struct {
// should in principle be able to connect two SDK environments directly
// instead of going through the runner at all, which would be a small
// efficiency gain, in runner memory use.
//
// That would also warrant an execution mode where fusion is taken into
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
ID string
transforms []string
Expand Down Expand Up @@ -145,11 +149,11 @@ progress:
if previousIndex == index && !splitsDone {
sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
break progress
}
if sr.GetChannelSplits() == nil {
slog.Warn("split failed", "bundle", rb)
slog.Debug("SDK returned no splits", "bundle", rb)
splitsDone = true
continue progress
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func TestUnimplemented(t *testing.T) {
tests := []struct {
pipeline func(s beam.Scope)
}{
// These tests don't terminate, so can't be run.
// {pipeline: primitives.Drain}, // Can't test drain automatically yet.

{pipeline: primitives.TestStreamBoolSequence},
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}

// Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.
func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleProgress{
Expand All @@ -159,6 +160,7 @@ func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
return resp.GetProcessBundleProgress(), nil
}

// Split sends a split request for the given bundle to the passed in worker, blocking on the response.
func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleSplit{
Expand Down
7 changes: 4 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
// TODO: Do more than assume these are ProcessBundleResponses.
wk.mu.Lock()
if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok {
// Error is handled in the resonse handler.
b.Respond(resp)
} else {
slog.Debug("ctrl.Recv: %v", resp)
Expand All @@ -268,7 +267,10 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
for {
select {
case req := <-wk.InstReqs:
ctrl.Send(req)
err := ctrl.Send(req)
if err != nil {
return err
}
case <-ctrl.Context().Done():
slog.Debug("Control context canceled")
return ctrl.Context().Err()
Expand Down Expand Up @@ -322,7 +324,6 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
wk.mu.Unlock()
}
}()

for {
select {
case req, ok := <-wk.DataReqs:
Expand Down
16 changes: 8 additions & 8 deletions sdks/go/pkg/beam/runners/universal/runnerlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
return errors.Wrap(err, "failed to get job stream")
}

mostRecentError := errors.New("<no error received, see runner logs>")
mostRecentError := "<no error received>"
var errReceived, jobFailed bool

for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
if jobFailed {
// Connection finished with a failed status, so produce what we have.
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
// Connection finished, so time to exit, produce what we have.
return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError)
}
return nil
}
Expand All @@ -123,17 +123,17 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
case msg.GetStateResponse() != nil:
resp := msg.GetStateResponse()

log.Infof(ctx, "Job state: %v", resp.GetState().String())
log.Infof(ctx, "Job[%v] state: %v", jobID, resp.GetState().String())

switch resp.State {
case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
return nil
case jobpb.JobState_FAILED:
jobFailed = true
if errReceived {
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError)
}
// Otherwise, wait for at least one error log from the runner, or the connection to close.
// Otherwise we should wait for at least one error log from the runner.
}

case msg.GetMessageResponse() != nil:
Expand All @@ -144,10 +144,10 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID

if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR {
errReceived = true
mostRecentError = errors.New(resp.GetMessageText())
mostRecentError = resp.GetMessageText()

if jobFailed {
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
return errors.Errorf("job %v failed:\n%w", jobID, errors.New(mostRecentError))
}
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/vet/vet.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func init() {
type disabledResolver bool

func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
return 0, errors.Errorf("%v not found. Register DoFns and functions with the beam/register package.", name)
}

// Execute evaluates the pipeline on whether it can run without reflection.
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ var portableFilters = []string{
"TestSetStateClear",
}

// TODO(lostluck): set up a specific run for these.
var prismFilters = []string{
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
Expand All @@ -149,7 +148,7 @@ var prismFilters = []string{
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The prsim runner does not support pipeline drain for SDF.
// The prism runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,9 @@ words = ...
{{< /highlight >}}

{{< highlight go >}}

The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner.

// words is the input PCollection of strings
var words beam.PCollection = ...

Expand Down Expand Up @@ -1170,8 +1173,8 @@ words = ...
{{< /highlight >}}

{{< highlight go >}}
// words is the input PCollection of strings
var words beam.PCollection = ...

The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner.

{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply_anon >}}
{{< /highlight >}}
Expand All @@ -1191,7 +1194,7 @@ words = ...

<span class="language-go">

> **Note:** Anonymous function DoFns may not work on distributed runners.
> **Note:** Anonymous function DoFns do not work on distributed runners.
> It's recommended to use named functions and register them with `register.FunctionXxY` in
> an `init()` block.
Expand Down

0 comments on commit 3b112e8

Please sign in to comment.