Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14430] Adding a logical type support for Python callables to Row schema #17608

Merged
merged 7 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1094,13 +1094,6 @@ message StandardCoders {
}
}

message LogicalTypes {
enum Enum {
// A URN for Python Callable logical type
PYTHON_CALLABLE = 0 [(beam_urn) = "beam:logical_type:python_callable:v1"];
}
}

// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "SchemaApi";

import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
ihji marked this conversation as resolved.
Show resolved Hide resolved

message Schema {
// List of fields for this schema. Two fields may not share a name.
repeated Field fields = 1;
Expand Down Expand Up @@ -110,6 +112,20 @@ message LogicalType {
FieldValue argument = 5;
}

// Universally defined Logical types for Row schemas.
// These logical types are supposed to be understood by all SDKs.
message LogicalTypes {
enum Enum {
// A URN for Python Callable logical type
// - Representation type: STRING
// - Language type: In Python SDK, PythonCallableWithSource.
// In any other SDKs, a wrapper object for a string which
// can be evaluated to a Python Callable object.
PYTHON_CALLABLE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:python_callable:v1"];
}
}

message Option {
// REQUIRED. Identifier for the option.
string name = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.beam.sdk.schemas.logicaltypes;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.LogicalTypes;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
Expand All @@ -30,7 +30,7 @@
@Experimental(Experimental.Kind.SCHEMAS)
public class PythonCallable implements LogicalType<PythonCallableSource, String> {
public static final String IDENTIFIER =
LogicalTypes.Enum.PYTHON_CALLABLE
SchemaApi.LogicalTypes.Enum.PYTHON_CALLABLE
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from .api import beam_runner_api_pb2_urns
from .api import external_transforms_pb2_urns
from .api import metrics_pb2_urns
from .api import schema_pb2_urns
from .api import standard_window_fns_pb2_urns

BeamConstants = beam_runner_api_pb2_urns.BeamConstants
LogicalTypes = beam_runner_api_pb2_urns.LogicalTypes
StandardArtifacts = beam_runner_api_pb2_urns.StandardArtifacts
StandardCoders = beam_runner_api_pb2_urns.StandardCoders
StandardDisplayData = beam_runner_api_pb2_urns.StandardDisplayData
Expand All @@ -40,6 +40,7 @@
MonitoringInfo = metrics_pb2_urns.MonitoringInfo
MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs
MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns
LogicalTypes = schema_pb2_urns.LogicalTypes
FixedWindowsPayload = standard_window_fns_pb2_urns.FixedWindowsPayload
GlobalWindowsPayload = standard_window_fns_pb2_urns.GlobalWindowsPayload
SessionWindowsPayload = standard_window_fns_pb2_urns.SessionWindowsPayload
Expand Down