diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java index 39a23685eb23..c4a4902b9d6c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java @@ -30,6 +30,7 @@ @Internal @Experimental(Kind.SCHEMAS) public final class Providers { + public interface Identifyable { /** * Returns an id that uniquely represents this among others implementing its derived interface. @@ -44,16 +45,12 @@ public static Map loadProviders(Class kla for (T provider : ServiceLoader.load(klass)) { // Avro provider is treated as a special case since two Avro providers may want to be loaded - // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed. - // TODO: this check should be removed once once AvroPayloadSerializerProvider from "core" is + // TODO: we won't need this check once all Avro providers from "core" will be // removed if (provider.identifier().equals("avro")) { // Avro provider from "extensions/avro" must have a priority. - if (provider - .getClass() - .getName() - .equals( - "org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) { - // Use AvroPayloadSerializerProvider from extensions/avro by any case. + if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) { + // Load Avro provider from "extensions/avro" by any case. providers.put(provider.identifier(), provider); } else { // Load Avro provider from "core" if it was not loaded from Avro extension before. diff --git a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java index f61a34532cbb..d8f797e8168b 100644 --- a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java +++ b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java @@ -17,10 +17,13 @@ */ package org.apache.beam.sdk.extensions.schemaio.expansion; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + import com.google.auto.service.AutoService; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; import javax.annotation.Nullable; @@ -50,22 +53,57 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi @Override public Map> knownBuilderInstances() { - ImmutableMap.Builder builder = ImmutableMap.builder(); + Map> providers = new HashMap<>(); try { - for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) { - builder.put( - "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1", - new ReaderBuilder(schemaIOProvider)); - builder.put( - "beam:transform:org.apache.beam:schemaio_" - + schemaIOProvider.identifier() - + "_write:v1", - new WriterBuilder(schemaIOProvider)); + for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) { + // Avro provider is treated as a special case since two Avro providers may want to be loaded + // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed. + // TODO: we won't need this check once all Avro providers from "core" will be + // removed + if (provider.identifier().equals("avro")) { + // Avro provider from "extensions/avro" must have a priority. + if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) { + // Load Avro provider from "extensions/avro" by any case. + registerProvider(providers, provider); + } else { + // Load Avro provider from "core" if it was not loaded from Avro extension before. + registerProviderOptionally(providers, provider); + } + } else { + final String identifier = + "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1"; + checkState( + !providers.containsKey(identifier), + "Duplicate providers exist with identifier `%s` for class %s.", + identifier, + SchemaIOProvider.class); + registerProvider(providers, provider); + } } } catch (Exception e) { throw new RuntimeException(e.getMessage()); } - return builder.build(); + return ImmutableMap.copyOf(providers); + } + + private void registerProvider( + Map> providers, SchemaIOProvider provider) { + providers.put( + "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1", + new ReaderBuilder(provider)); + providers.put( + "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1", + new WriterBuilder(provider)); + } + + private void registerProviderOptionally( + Map> providers, SchemaIOProvider provider) { + providers.putIfAbsent( + "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1", + new ReaderBuilder(provider)); + providers.putIfAbsent( + "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1", + new WriterBuilder(provider)); } public static class Configuration {