Skip to content

Commit

Permalink
[YAML] - PubSubLite proto (#30129)
Browse files Browse the repository at this point in the history
* [YAML] - PubSubLite proto

* [YAML] - PubSubLite proto
  • Loading branch information
ffernandez92 authored Feb 1, 2024
1 parent 1d9f604 commit bdbbef9
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 23 deletions.
1 change: 1 addition & 0 deletions sdks/java/extensions/protobuf/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
implementation library.java.commons_compress
implementation library.java.slf4j_api
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation("com.google.cloud:google-cloud-storage:2.32.1")
implementation library.java.protobuf_java
implementation("com.squareup.wire:wire-schema-jvm:4.9.3")
implementation("io.apicurio:apicurio-registry-protobuf-schema-utilities:3.0.0.M2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static java.util.stream.Collectors.toList;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
Expand All @@ -41,15 +44,15 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.commons.compress.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for working with Protocol Buffer (Proto) data in the context of Apache Beam. This
* class provides methods to retrieve Beam Schemas from Proto messages, convert Proto bytes to Beam
* Rows, and vice versa. It also includes utilities for handling Protocol Buffer schemas and related
* file operations.
* Utility class for working with Protocol Buffer (Proto) data. This class provides methods to
* retrieve Beam Schemas from Proto messages, convert Proto bytes to Beam Rows, and vice versa. It
* also includes utilities for handling Protocol Buffer schemas and related file operations.
*
* <p>Users can utilize the methods in this class to facilitate the integration of Proto data
* processing within Apache Beam pipelines, allowing for the seamless transformation of Proto
Expand Down Expand Up @@ -105,7 +108,11 @@ private static Descriptors.Descriptor getDescriptorFromProtoSchema(
try {
Descriptors.FileDescriptor fileDescriptor =
FileDescriptorUtils.protoFileToFileDescriptor(result);
return fileDescriptor.findMessageTypeByName(messageName);

List<String> messageElements = Splitter.on('.').splitToList(messageName);
String messageTypeByName = messageElements.get(messageElements.size() - 1);

return fileDescriptor.findMessageTypeByName(messageTypeByName);
} catch (Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -148,10 +155,12 @@ public static SerializableFunction<byte[], Row> getProtoBytesToRowFunction(
@Override
public Row apply(byte[] input) {
try {
List<String> messageElements = Splitter.on('.').splitToList(messageName);
String messageTypeByName = messageElements.get(messageElements.size() - 1);
final Descriptors.Descriptor descriptor =
protoDomain
.getFileDescriptor(dynamicProtoDomain.getFileName())
.findMessageTypeByName(messageName);
.findMessageTypeByName(messageTypeByName);
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, input);
SerializableFunction<DynamicMessage, Row> res =
protoDynamicMessageSchema.getToRowFunction();
Expand Down Expand Up @@ -243,6 +252,41 @@ private static byte[] getFileAsBytes(String fileDescriptorPath) {
* @throws RuntimeException if an error occurs while finding or opening the file.
*/
private static ReadableByteChannel getFileByteChannel(String filePath) {
if (isGcsPath(filePath)) {
return openGcsFile(filePath);
} else {
return openLocalFile(filePath);
}
}

private static boolean isGcsPath(String filePath) {
return filePath.startsWith("gs://");
}

/**
* Opens a ReadableByteChannel for reading from a Google Cloud Storage (GCS) file.
*
* @param filePath The GCS file path (e.g., "gs://your-bucket-name/your-object-name").
* @return A ReadableByteChannel for reading from the specified GCS file.
*/
private static ReadableByteChannel openGcsFile(String filePath) {
Storage storage = StorageOptions.getDefaultInstance().getService();
String bucketName = getBucketName(filePath);
String objectName = getObjectName(filePath);
Blob blob = storage.get(bucketName, objectName);
return blob.reader();
}

/**
* Opens a ReadableByteChannel for reading from a local file using the Apache Beam FileSystems
* API.
*
* @param filePath The local file path.
* @return A ReadableByteChannel for reading from the specified local file.
* @throws IllegalArgumentException If no files match the specified pattern or if more than one
* file matches.
*/
private static ReadableByteChannel openLocalFile(String filePath) {
try {
MatchResult result = FileSystems.match(filePath);
checkArgument(
Expand All @@ -259,6 +303,29 @@ private static ReadableByteChannel getFileByteChannel(String filePath) {
}
}

/**
* Extracts the bucket name from a Google Cloud Storage (GCS) file path.
*
* @param gcsPath The GCS file path (e.g., "gs://your-bucket-name/your-object-name").
* @return The bucket name extracted from the GCS path.
*/
private static String getBucketName(String gcsPath) {
int startIndex = "gs://".length();
int endIndex = gcsPath.indexOf('/', startIndex);
return gcsPath.substring(startIndex, endIndex);
}

/**
* Extracts the object name from a Google Cloud Storage (GCS) file path.
*
* @param gcsPath The GCS file path (e.g., "gs://your-bucket-name/your-object-name").
* @return The object name extracted from the GCS path.
*/
private static String getObjectName(String gcsPath) {
int startIndex = gcsPath.indexOf('/', "gs://".length()) + 1;
return gcsPath.substring(startIndex);
}

/**
* Represents metadata associated with a Protocol Buffer schema, including the File Name and
* ProtoDomain.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ public class ProtoByteUtilsTest {
+ " Address address = 4;\n"
+ "}";

private static final String PROTO_STRING_PACKAGE_SCHEMA =
"syntax = \"proto3\";\n"
+ "package com.test.proto;"
+ "\n"
+ "message MyMessage {\n"
+ " int32 id = 1;\n"
+ " string name = 2;\n"
+ " bool active = 3;\n"
+ "\n"
+ " // Nested field\n"
+ " message Address {\n"
+ " string street = 1;\n"
+ " string city = 2;\n"
+ " string state = 3;\n"
+ " string zip_code = 4;\n"
+ " }\n"
+ "\n"
+ " Address address = 4;\n"
+ "}";

private static final String DESCRIPTOR_PATH =
Objects.requireNonNull(
ProtoByteUtilsTest.class.getResource(
Expand Down Expand Up @@ -84,6 +104,14 @@ public void testProtoSchemaStringToBeamSchema() {
Assert.assertEquals(schema.getFieldNames(), SCHEMA.getFieldNames());
}

@Test
public void testProtoSchemaWitPackageStringToBeamSchema() {
Schema schema =
ProtoByteUtils.getBeamSchemaFromProtoSchema(
PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage");
Assert.assertEquals(schema.getFieldNames(), SCHEMA.getFieldNames());
}

@Test
public void testProtoBytesToRowFunctionGenerateSerializableFunction() {
SerializableFunction<byte[], Row> protoBytesToRowFunction =
Expand Down Expand Up @@ -111,6 +139,22 @@ public void testProtoBytesToRowFunctionReturnsRowFailure() {
protoBytesToRowFunction.apply(inputBytes);
}

@Test
public void testProtoBytesToRowFunctionReturnsRowSuccess() {
// Create a proto bytes to row function
SerializableFunction<byte[], Row> protoBytesToRowFunction =
ProtoByteUtils.getProtoBytesToRowFunction(DESCRIPTOR_PATH, MESSAGE_NAME);

byte[] byteArray = {
8, -46, 9, 18, 3, 68, 111, 101, 34, 35, 10, 7, 115, 101, 97, 116, 116, 108, 101, 18, 11, 102,
97, 107, 101, 32, 115, 116, 114, 101, 101, 116, 26, 2, 119, 97, 34, 7, 84, 79, 45, 49, 50, 51,
52
};

Row row = protoBytesToRowFunction.apply(byteArray);
Assert.assertEquals("Doe", row.getValue("name"));
}

@Test
public void testRowToProtoFunction() {
Row row =
Expand Down Expand Up @@ -144,4 +188,32 @@ public void testRowToProtoSchemaFunction() {
Assert.assertNotNull(
ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, "MyMessage").apply(row));
}

@Test
public void testRowToProtoSchemaWithPackageFunction() {
Row row =
Row.withSchema(SCHEMA)
.withFieldValue("id", 1234)
.withFieldValue("name", "Doe")
.withFieldValue("active", false)
.withFieldValue("address.city", "seattle")
.withFieldValue("address.street", "fake street")
.withFieldValue("address.zip_code", "TO-1234")
.withFieldValue("address.state", "wa")
.build();

byte[] byteArray = {
8, -46, 9, 18, 3, 68, 111, 101, 34, 35, 10, 7, 115, 101, 97, 116, 116, 108, 101, 18, 11, 102,
97, 107, 101, 32, 115, 116, 114, 101, 101, 116, 26, 2, 119, 97, 34, 7, 84, 79, 45, 49, 50, 51,
52
};

byte[] resultBytes =
ProtoByteUtils.getRowToProtoBytesFromSchema(
PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage")
.apply(row);

Assert.assertNotNull(resultBytes);
Assert.assertArrayEquals(byteArray, resultBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.function.Consumer;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class PubsubLiteReadSchemaTransformProvider
private static final Logger LOG =
LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class);

public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON";
public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON,PROTO";
public static final Set<String> VALID_DATA_FORMATS =
Sets.newHashSet(VALID_FORMATS_STR.split(","));

Expand Down Expand Up @@ -207,26 +208,39 @@ public void finish(FinishBundleContext c) {
Schema beamSchema;

if (format != null && format.equals("RAW")) {
if (inputSchema != null) {
throw new IllegalArgumentException(
"To read from PubSubLite in RAW format, you can't provide a schema.");
}

beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
valueMapper = getRawBytesToRowFunction(beamSchema);

} else if (format != null && format.equals("PROTO")) {
String fileDescriptorPath = configuration.getFileDescriptorPath();
String messageName = configuration.getMessageName();

if (fileDescriptorPath != null && messageName != null) {
beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName);
valueMapper = ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
} else if (inputSchema != null && messageName != null) {
beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName);
valueMapper = ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
} else {
throw new IllegalArgumentException(
"To read from PubSubLite in PROTO format, either descriptorPath or schema must be provided.");
}

} else {
if (inputSchema == null) {
if (inputSchema != null) {
beamSchema =
Objects.equals(configuration.getFormat(), "JSON")
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
valueMapper =
Objects.equals(configuration.getFormat(), "JSON")
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
} else {
throw new IllegalArgumentException(
"To read from PubSubLite in JSON or AVRO format, you must provide a schema.");
"To read from Pubsub Lite in JSON or AVRO format, you must provide a schema.");
}
beamSchema =
Objects.equals(configuration.getFormat(), "JSON")
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
valueMapper =
Objects.equals(configuration.getFormat(), "JSON")
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
}
return new SchemaTransform() {
@Override
Expand Down Expand Up @@ -404,13 +418,33 @@ public Uuid apply(SequencedMessage input) {
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class PubsubLiteReadSchemaTransformConfiguration {

public void validate() {
final String dataFormat = this.getFormat();
assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
: "Valid data formats are " + VALID_DATA_FORMATS;

final String inputSchema = this.getSchema();
final String messageName = this.getMessageName();

if (dataFormat != null && dataFormat.equals("RAW")) {
assert inputSchema == null
: "To read from Pubsub Lite in RAW format, you can't provide a schema.";
}

if (dataFormat != null && dataFormat.equals("PROTO")) {
assert messageName != null
: "To read from Pubsub Lite in PROTO format, messageName must be provided.";
}
}

@SchemaFieldDescription(
"The encoding format for the data stored in Pubsub Lite. Valid options are: "
+ VALID_FORMATS_STR)
public abstract String getFormat();

@SchemaFieldDescription(
"The schema in which the data is encoded in the Kafka topic. "
"The schema in which the data is encoded in the Pubsub Lite topic. "
+ "For AVRO data, this is a schema defined with AVRO schema syntax "
+ "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). "
+ "For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/).")
Expand Down Expand Up @@ -459,6 +493,18 @@ public abstract static class PubsubLiteReadSchemaTransformConfiguration {
+ "case, deduplication of the stream will be strictly best effort.")
public abstract @Nullable String getAttributeId();

@SchemaFieldDescription(
"The path to the Protocol Buffer File Descriptor Set file. This file is used for schema"
+ " definition and message serialization.")
@Nullable
public abstract String getFileDescriptorPath();

@SchemaFieldDescription(
"The name of the Protocol Buffer message to be used for schema"
+ " extraction and data conversion.")
@Nullable
public abstract String getMessageName();

public static Builder builder() {
return new AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration
.Builder();
Expand Down Expand Up @@ -486,6 +532,12 @@ public abstract static class Builder {
@SuppressWarnings("unused")
public abstract Builder setAttributeId(String attributeId);

@SuppressWarnings("unused")
public abstract Builder setFileDescriptorPath(String fileDescriptorPath);

@SuppressWarnings("unused")
public abstract Builder setMessageName(String messageName);

public abstract PubsubLiteReadSchemaTransformConfiguration build();
}
}
Expand Down
Loading

0 comments on commit bdbbef9

Please sign in to comment.