diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index e7a3da7ff10..ae38e96ebf9 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -17,7 +17,6 @@ package graphx import ( "context" - "encoding/json" "fmt" "sort" "strings" @@ -35,6 +34,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" ) @@ -125,7 +125,7 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig case URNEnvProcess: config := extractEnvironmentConfig(ctx) payload := &pipepb.ProcessPayload{} - if err := json.Unmarshal([]byte(config), payload); err != nil { + if err := protojson.Unmarshal([]byte(config), payload); err != nil { return nil, fmt.Errorf("unable to json unmarshal --environment_config: %w", err) } serializedPayload = protox.MustEncode(payload)