diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 54d328cce82c..a19229a7857f 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1046,12 +1046,8 @@ message StandardCoders { // Nullable types in container types (ArrayType, MapType) per the // encoding described for general Nullable types below. // - // Well known logical types: - // beam:logical_type:micros_instant:v1 - // - Representation type: ROW - // - A timestamp without a timezone where seconds + micros represents the - // amount of time since the epoch. - // + // Logical types understood by all SDKs should be defined in schema.proto. + // Example of well known logical types: // beam:logical_type:schema:v1 // - Representation type: BYTES // - A Beam Schema stored as a serialized proto. diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index b26fc8fef8d6..3a6a79a6e2ea 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -31,6 +31,8 @@ option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v option java_package = "org.apache.beam.model.pipeline.v1"; option java_outer_classname = "SchemaApi"; +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; + message Schema { // List of fields for this schema. Two fields may not share a name. repeated Field fields = 1; @@ -110,6 +112,27 @@ message LogicalType { FieldValue argument = 5; } +// Universally defined Logical types for Row schemas. +// These logical types are supposed to be understood by all SDKs. +message LogicalTypes { + enum Enum { + // A URN for Python Callable logical type + // - Representation type: STRING + // - Language type: In Python SDK, PythonCallableWithSource. + // In any other SDKs, a wrapper object for a string which + // can be evaluated to a Python Callable object. + PYTHON_CALLABLE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:python_callable:v1"]; + + // A URN for MicrosInstant type + // - Representation type: ROW + // - A timestamp without a timezone where seconds + micros represents the + // amount of time since the epoch. + MICROS_INSTANT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:micros_instant:v1"]; + } +} + message Option { // REQUIRED. Identifier for the option. string name = 1; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index e1112b2472de..9a63c2d87819 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType; import org.apache.beam.sdk.util.SerializableUtils; @@ -74,6 +75,7 @@ public class SchemaTranslation { ImmutableMap.>>builder() .put(MicrosInstant.IDENTIFIER, MicrosInstant.class) .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class) + .put(PythonCallable.IDENTIFIER, PythonCallable.class) .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java index 6c1fea85d842..a388731a14c5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.schemas.logicaltypes; import java.time.Instant; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; @@ -36,7 +38,11 @@ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class MicrosInstant implements Schema.LogicalType { - public static final String IDENTIFIER = "beam:logical_type:micros_instant:v1"; + public static final String IDENTIFIER = + SchemaApi.LogicalTypes.Enum.MICROS_INSTANT + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); // TODO(BEAM-10878): This should be a constant private final Schema schema; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java new file mode 100644 index 000000000000..ea4e297515e9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.util.PythonCallableSource; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A logical type for PythonCallableSource objects. */ +@Experimental(Experimental.Kind.SCHEMAS) +public class PythonCallable implements LogicalType { + public static final String IDENTIFIER = + SchemaApi.LogicalTypes.Enum.PYTHON_CALLABLE + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public Schema.@Nullable FieldType getArgumentType() { + return null; + } + + @Override + public Schema.FieldType getBaseType() { + return Schema.FieldType.STRING; + } + + @Override + public @NonNull String toBaseType(@NonNull PythonCallableSource input) { + return input.getPythonCallableCode(); + } + + @Override + public @NonNull PythonCallableSource toInputType(@NonNull String base) { + return PythonCallableSource.of(base); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 103405037bed..a1437c2d0ccd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -126,6 +126,7 @@ public static Schema.FieldType fieldFromType( return fieldFromType(type, fieldValueTypeSupplier, new HashMap()); } + // TODO(BEAM-14458): support type inference for logical types private static Schema.FieldType fieldFromType( TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java new file mode 100644 index 000000000000..8875d8982963 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; + +/** + * A wrapper object storing a Python code that can be evaluated to Python callables in Python SDK. + */ +public class PythonCallableSource implements Serializable { + private final String pythonCallableCode; + + private PythonCallableSource(String pythonCallableCode) { + this.pythonCallableCode = pythonCallableCode; + } + + public static PythonCallableSource of(String pythonCallableCode) { + // TODO(BEAM-14457): check syntactic correctness of Python code if possible + return new PythonCallableSource(pythonCallableCode); + } + + public String getPythonCallableCode() { + return pythonCallableCode; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index f4274de02ea5..9f3f7004e8c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; @@ -132,6 +133,7 @@ public static Iterable data() { Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) .add(Schema.of(Field.of("fixed_bytes", FieldType.logicalType(FixedBytes.of(24))))) .add(Schema.of(Field.of("micros_instant", FieldType.logicalType(new MicrosInstant())))) + .add(Schema.of(Field.of("python_callable", FieldType.logicalType(new PythonCallable())))) .add( Schema.of( Field.of("field_with_option_atomic", FieldType.STRING) diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java index c412acd220ee..198a32c138ef 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.python; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -33,10 +34,12 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.utils.StaticSchemaInference; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.PythonCallableSource; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -64,6 +67,7 @@ public class PythonExternalTransform kwargsMap; + private Map, Schema.FieldType> typeHints; private @Nullable Object @NonNull [] argsArray; private @Nullable Row providedKwargsRow; @@ -72,6 +76,11 @@ private PythonExternalTransform(String fullyQualifiedName, String expansionServi this.fullyQualifiedName = fullyQualifiedName; this.expansionService = expansionService; this.kwargsMap = new TreeMap<>(); + this.typeHints = new HashMap<>(); + // TODO(BEAM-14458): remove a default type hint for PythonCallableSource when BEAM-14458 is + // resolved + this.typeHints.put( + PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); argsArray = new Object[] {}; } @@ -162,6 +171,26 @@ public PythonExternalTransform withKwargs(Row kwargs) { return this; } + /** + * Specifies the field type of arguments. + * + *

Type hints are especially useful for logical types since type inference does not work well + * for logical types. + * + * @param argType A class object for the argument type. + * @param fieldType A schema field type for the argument. + * @return updated wrapper for the cross-language transform. + */ + public PythonExternalTransform withTypeHint( + java.lang.Class argType, Schema.FieldType fieldType) { + if (typeHints.containsKey(argType)) { + throw new IllegalArgumentException( + String.format("typehint for arg type %s already exists", argType)); + } + typeHints.put(argType, fieldType); + return this; + } + @VisibleForTesting Row buildOrGetKwargsRow() { if (providedKwargsRow != null) { @@ -179,16 +208,18 @@ Row buildOrGetKwargsRow() { // Types that are not one of following are considered custom types. // * Java primitives // * Type String + // * Any Type explicitly annotated by withTypeHint() // * Type Row - private static boolean isCustomType(java.lang.Class type) { + private boolean isCustomType(java.lang.Class type) { boolean val = !(ClassUtils.isPrimitiveOrWrapper(type) || type == String.class + || typeHints.containsKey(type) || Row.class.isAssignableFrom(type)); return val; } - // If the custom type has a registered schema, we use that. OTherwise we try to register it using + // If the custom type has a registered schema, we use that. Otherwise, we try to register it using // 'JavaFieldSchema'. private Row convertCustomValue(Object value) { SerializableFunction toRowFunc; @@ -239,6 +270,8 @@ private Schema generateSchemaDirectly( if (field instanceof Row) { // Rows are used as is but other types are converted to proper field types. builder.addRowField(fieldName, ((Row) field).getSchema()); + } else if (typeHints.containsKey(field.getClass())) { + builder.addField(fieldName, typeHints.get(field.getClass())); } else { builder.addField( fieldName, diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java similarity index 83% rename from sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java rename to sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java index 60deebfc6e66..5d55a6e8d345 100644 --- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java @@ -22,14 +22,17 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; +import java.time.Instant; import java.util.Map; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.PythonCallableSource; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -41,7 +44,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class ExternalPythonTransformTest implements Serializable { +public class PythonExternalTransformTest implements Serializable { @Ignore("BEAM-14148") @Test public void trivialPythonTransform() { @@ -184,6 +187,29 @@ public void generateArgsWithCustomType() { assertEquals(456, (int) receivedRow.getRow("field1").getInt32("intField")); } + @Test + public void generateArgsWithPythonCallableSource() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withArgs(PythonCallableSource.of("dummy data")); + Row receivedRow = transform.buildOrGetArgsRow(); + assertTrue(receivedRow.getValue("field0") instanceof PythonCallableSource); + } + + @Test + public void generateArgsWithTypeHint() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withArgs(Instant.ofEpochSecond(0)) + .withTypeHint(Instant.class, Schema.FieldType.logicalType(new MicrosInstant())); + Row receivedRow = transform.buildOrGetArgsRow(); + assertTrue(receivedRow.getValue("field0") instanceof Instant); + } + @Test public void generateKwargsEmpty() { PythonExternalTransform transform = @@ -274,6 +300,29 @@ public void generateKwargsWithCustomType() { assertEquals(456, (int) receivedRow.getRow("customField1").getInt32("intField")); } + @Test + public void generateKwargsWithPythonCallableSource() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withKwarg("customField0", PythonCallableSource.of("dummy data")); + Row receivedRow = transform.buildOrGetKwargsRow(); + assertTrue(receivedRow.getValue("customField0") instanceof PythonCallableSource); + } + + @Test + public void generateKwargsWithTypeHint() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withKwarg("customField0", Instant.ofEpochSecond(0)) + .withTypeHint(Instant.class, Schema.FieldType.logicalType(new MicrosInstant())); + Row receivedRow = transform.buildOrGetKwargsRow(); + assertTrue(receivedRow.getValue("customField0") instanceof Instant); + } + @Test public void generateKwargsFromMap() { Map kwargsMap = diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index daf54ea04da3..5e8a3ce4cce1 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -22,6 +22,7 @@ from .api import beam_runner_api_pb2_urns from .api import external_transforms_pb2_urns from .api import metrics_pb2_urns +from .api import schema_pb2_urns from .api import standard_window_fns_pb2_urns BeamConstants = beam_runner_api_pb2_urns.BeamConstants @@ -39,6 +40,7 @@ MonitoringInfo = metrics_pb2_urns.MonitoringInfo MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns +LogicalTypes = schema_pb2_urns.LogicalTypes FixedWindowsPayload = standard_window_fns_pb2_urns.FixedWindowsPayload GlobalWindowsPayload = standard_window_fns_pb2_urns.GlobalWindowsPayload SessionWindowsPayload = standard_window_fns_pb2_urns.SessionWindowsPayload @@ -76,3 +78,6 @@ displayData = StandardDisplayData.DisplayData java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP + +micros_instant = LogicalTypes.Enum.MICROS_INSTANT +python_callable = LogicalTypes.Enum.PYTHON_CALLABLE diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 5a04ba51722b..02eac46ae5d6 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -69,6 +69,7 @@ import numpy as np from google.protobuf import text_format +from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import row_type from apache_beam.typehints.native_type_compatibility import _get_args @@ -78,6 +79,7 @@ from apache_beam.typehints.native_type_compatibility import extract_optional_type from apache_beam.typehints.native_type_compatibility import match_is_named_tuple from apache_beam.utils import proto_utils +from apache_beam.utils.python_callable import PythonCallableWithSource from apache_beam.utils.timestamp import Timestamp PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1" @@ -540,7 +542,7 @@ class MicrosInstant(NoArgumentLogicalType[Timestamp, MicrosInstantRepresentation]): @classmethod def urn(cls): - return "beam:logical_type:micros_instant:v1" + return common_urns.micros_instant.urn @classmethod def representation_type(cls): @@ -559,3 +561,27 @@ def to_representation_type(self, value): def to_language_type(self, value): # type: (MicrosInstantRepresentation) -> Timestamp return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) + + +@LogicalType.register_logical_type +class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): + @classmethod + def urn(cls): + return common_urns.python_callable.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return str + + @classmethod + def language_type(cls): + return PythonCallableWithSource + + def to_representation_type(self, value): + # type: (PythonCallableWithSource) -> str + return value.get_source() + + def to_language_type(self, value): + # type: (str) -> PythonCallableWithSource + return PythonCallableWithSource(value) diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 834edf18777e..404d9c5583c3 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -31,6 +31,7 @@ import numpy as np +from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints.native_type_compatibility import match_is_named_tuple from apache_beam.typehints.schemas import SchemaTypeRegistry @@ -239,6 +240,22 @@ def test_float_maps_to_float64(self): schema_pb2.FieldType(atomic_type=schema_pb2.DOUBLE), typing_to_runner_api(float)) + def test_python_callable_maps_to_logical_type(self): + from apache_beam.utils.python_callable import PythonCallableWithSource + self.assertEqual( + schema_pb2.FieldType( + logical_type=schema_pb2.LogicalType( + urn=common_urns.python_callable.urn, + representation=typing_to_runner_api(str))), + typing_to_runner_api(PythonCallableWithSource)) + self.assertEqual( + typing_from_runner_api( + schema_pb2.FieldType( + logical_type=schema_pb2.LogicalType( + urn=common_urns.python_callable.urn, + representation=typing_to_runner_api(str)))), + PythonCallableWithSource) + def test_trivial_example(self): MyCuteClass = NamedTuple( 'MyCuteClass', diff --git a/sdks/python/apache_beam/utils/python_callable.py b/sdks/python/apache_beam/utils/python_callable.py new file mode 100644 index 000000000000..9238e4de66ba --- /dev/null +++ b/sdks/python/apache_beam/utils/python_callable.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Python Callable utilities. + +For internal use only; no backwards-compatibility guarantees. +""" + + +class PythonCallableWithSource(object): + """Represents a Python callable object with source codes before evaluated. + + Proxy object to Store a callable object with its string form (source code). + The string form is used when the object is encoded and transferred to foreign + SDKs (non-Python SDKs). + """ + def __init__(self, source): + # type: (str) -> None + self._source = source + self._callable = eval(source) # pylint: disable=eval-used + + def get_source(self): + # type: () -> str + return self._source + + def __call__(self, *args, **kwargs): + return self._callable(*args, **kwargs)