Skip to content

Commit

Permalink
Prioritise Avro providers from "extensions/core" (#25611)
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanenko-dev authored Feb 24, 2023
1 parent 160be6b commit dcfaddd
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@Internal
@Experimental(Kind.SCHEMAS)
public final class Providers {

public interface Identifyable {
/**
* Returns an id that uniquely represents this among others implementing its derived interface.
Expand All @@ -44,16 +45,12 @@ public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> kla
for (T provider : ServiceLoader.load(klass)) {
// Avro provider is treated as a special case since two Avro providers may want to be loaded -
// from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
// TODO: this check should be removed once once AvroPayloadSerializerProvider from "core" is
// TODO: we won't need this check once all Avro providers from "core" will be
// removed
if (provider.identifier().equals("avro")) {
// Avro provider from "extensions/avro" must have a priority.
if (provider
.getClass()
.getName()
.equals(
"org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) {
// Use AvroPayloadSerializerProvider from extensions/avro by any case.
if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
// Load Avro provider from "extensions/avro" by any case.
providers.put(provider.identifier(), provider);
} else {
// Load Avro provider from "core" if it was not loaded from Avro extension before.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.extensions.schemaio.expansion;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.service.AutoService;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -50,22 +53,57 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi

@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
try {
for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
builder.put(
"beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
new ReaderBuilder(schemaIOProvider));
builder.put(
"beam:transform:org.apache.beam:schemaio_"
+ schemaIOProvider.identifier()
+ "_write:v1",
new WriterBuilder(schemaIOProvider));
for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
// Avro provider is treated as a special case since two Avro providers may want to be loaded
// from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
// TODO: we won't need this check once all Avro providers from "core" will be
// removed
if (provider.identifier().equals("avro")) {
// Avro provider from "extensions/avro" must have a priority.
if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
// Load Avro provider from "extensions/avro" by any case.
registerProvider(providers, provider);
} else {
// Load Avro provider from "core" if it was not loaded from Avro extension before.
registerProviderOptionally(providers, provider);
}
} else {
final String identifier =
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1";
checkState(
!providers.containsKey(identifier),
"Duplicate providers exist with identifier `%s` for class %s.",
identifier,
SchemaIOProvider.class);
registerProvider(providers, provider);
}
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
return builder.build();
return ImmutableMap.copyOf(providers);
}

private void registerProvider(
Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
providers.put(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
new ReaderBuilder(provider));
providers.put(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
new WriterBuilder(provider));
}

private void registerProviderOptionally(
Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
providers.putIfAbsent(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
new ReaderBuilder(provider));
providers.putIfAbsent(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
new WriterBuilder(provider));
}

public static class Configuration {
Expand Down

0 comments on commit dcfaddd

Please sign in to comment.