Skip to content

Commit

Permalink
Transient schema registry clients
Browse files Browse the repository at this point in the history
  • Loading branch information
dmariassy committed Jul 26, 2024
1 parent e156374 commit 4044a8d
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

import java.util.Set;

import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.URL;
Expand All @@ -59,8 +56,9 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);

String schemaRegistryURL = formatOptions.get(URL);
SchemaRegistryClient schemaRegistryClient =
ProtobufConfluentFormatFactoryUtils.createCachedSchemaRegistryClient(formatOptions);
SchemaRegistryClientProvider schemaRegistryClientProvider =
ProtobufConfluentFormatFactoryUtils.createCachedSchemaRegistryClientProvider(
formatOptions);

return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -72,7 +70,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
context,
producedDataType,
projections,
schemaRegistryClient,
schemaRegistryClientProvider,
schemaRegistryURL,
formatOptions);
}
Expand All @@ -92,15 +90,20 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
formatOptions, IDENTIFIER);

String schemaRegistryURL = formatOptions.get(URL);
CachedSchemaRegistryClient schemaRegistryClient =
ProtobufConfluentFormatFactoryUtils.createCachedSchemaRegistryClient(formatOptions);
SchemaRegistryClientProviders.CachedSchemaRegistryClientProvider
schemaRegistryClientProvider =
ProtobufConfluentFormatFactoryUtils
.createCachedSchemaRegistryClientProvider(formatOptions);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
return ProtobufConfluentFormatFactoryUtils.createDynamicSerializationSchema(
consumedDataType, schemaRegistryClient, schemaRegistryURL, formatOptions);
consumedDataType,
schemaRegistryClientProvider,
schemaRegistryURL,
formatOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.flink.table.types.logical.RowType;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -70,15 +68,15 @@ public static ProtoRegistryDynamicDeserializationSchema createDynamicDeserializa
DynamicTableSource.Context context,
DataType producedDataType,
int[][] projections,
SchemaRegistryClient schemaRegistryClient,
SchemaRegistryClientProvider schemaRegistryClientProvider,
String schemaRegistryURL,
ReadableConfig formatOptions) {
producedDataType = Projection.of(projections).project(producedDataType);
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
return new ProtoRegistryDynamicDeserializationSchema(
schemaRegistryClient,
schemaRegistryClientProvider,
schemaRegistryURL,
rowType,
rowDataTypeInfo,
Expand All @@ -88,7 +86,7 @@ public static ProtoRegistryDynamicDeserializationSchema createDynamicDeserializa

public static ProtoRegistryDynamicSerializationSchema createDynamicSerializationSchema(
DataType consumedDataType,
SchemaRegistryClient schemaRegistryClient,
SchemaRegistryClientProvider schemaRegistryClientProvider,
String schemaRegistryURL,
ReadableConfig formatOptions) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
Expand All @@ -97,7 +95,7 @@ public static ProtoRegistryDynamicSerializationSchema createDynamicSerialization
formatOptions.get(MESSAGE_NAME),
rowType,
formatOptions.get(SUBJECT),
schemaRegistryClient,
schemaRegistryClientProvider,
schemaRegistryURL);
}

Expand Down Expand Up @@ -167,15 +165,14 @@ public static void validateDynamicEncodingOptions(
}
}

public static CachedSchemaRegistryClient createCachedSchemaRegistryClient(
ReadableConfig formatOptions) {
public static SchemaRegistryClientProviders.CachedSchemaRegistryClientProvider
createCachedSchemaRegistryClientProvider(ReadableConfig formatOptions) {
String schemaRegistryURL = formatOptions.get(URL);
List<SchemaProvider> providers = new ArrayList<>();
providers.add(new ProtobufSchemaProvider());
return new CachedSchemaRegistryClient(
return new SchemaRegistryClientProviders.CachedSchemaRegistryClientProvider(
schemaRegistryURL,
formatOptions.get(REGISTRY_CLIENT_CACHE_CAPACITY),
providers,
buildOptionalPropertiesMap(formatOptions));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.protobuf.registry.confluent;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

import java.io.Serializable;

public interface SchemaRegistryClientProvider extends Serializable {
SchemaRegistryClient createSchemaRegistryClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.protobuf.registry.confluent;

import org.apache.flink.annotation.VisibleForTesting;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/** Providers for {@link SchemaRegistryClient}. */
public class SchemaRegistryClientProviders {
public static class CachedSchemaRegistryClientProvider implements SchemaRegistryClientProvider {

private final String url;
private final int identityMapCapacity;
private final Map<String, String> properties;

public CachedSchemaRegistryClientProvider(
String url, int identityMapCapacity, Map<String, String> properties) {
this.url = url;
this.identityMapCapacity = identityMapCapacity;
this.properties = properties;
}

@Override
public SchemaRegistryClient createSchemaRegistryClient() {
List<SchemaProvider> providers = new ArrayList<>();
providers.add(new ProtobufSchemaProvider());
return new CachedSchemaRegistryClient(url, identityMapCapacity, providers, properties);
}
}

@VisibleForTesting
public static class MockSchemaRegistryClientProvider implements SchemaRegistryClientProvider {
private final MockSchemaRegistryClient client;

public MockSchemaRegistryClientProvider(MockSchemaRegistryClient client) {
this.client = client;
}

@Override
public SchemaRegistryClient createSchemaRegistryClient() {
return client;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatFactoryUtils;
import org.apache.flink.protobuf.registry.confluent.SchemaRegistryClientProviders;
import org.apache.flink.protobuf.registry.confluent.dynamic.deserializer.ProtoRegistryDynamicDeserializationSchema;
import org.apache.flink.protobuf.registry.confluent.dynamic.serializer.ProtoRegistryDynamicSerializationSchema;
import org.apache.flink.table.connector.ChangelogMode;
Expand All @@ -39,9 +40,6 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

import java.util.Set;

import static org.apache.flink.protobuf.registry.confluent.ProtobufConfluentFormatOptions.URL;
Expand All @@ -57,8 +55,10 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);

String schemaRegistryURL = formatOptions.get(URL);
SchemaRegistryClient schemaRegistryClient =
ProtobufConfluentFormatFactoryUtils.createCachedSchemaRegistryClient(formatOptions);
SchemaRegistryClientProviders.CachedSchemaRegistryClientProvider
schemaRegistryClientProvider =
ProtobufConfluentFormatFactoryUtils
.createCachedSchemaRegistryClientProvider(formatOptions);

return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -71,7 +71,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
context,
producedDataType,
projections,
schemaRegistryClient,
schemaRegistryClientProvider,
schemaRegistryURL,
formatOptions);
return new ProtobufConfluentDebeziumDeserializationSchema(wrappedDeser);
Expand All @@ -97,8 +97,10 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
formatOptions, IDENTIFIER);

String schemaRegistryURL = formatOptions.get(URL);
CachedSchemaRegistryClient schemaRegistryClient =
ProtobufConfluentFormatFactoryUtils.createCachedSchemaRegistryClient(formatOptions);
SchemaRegistryClientProviders.CachedSchemaRegistryClientProvider
schemaRegistryClientProvider =
ProtobufConfluentFormatFactoryUtils
.createCachedSchemaRegistryClientProvider(formatOptions);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
Expand All @@ -107,7 +109,7 @@ public SerializationSchema<RowData> createRuntimeEncoder(
ProtoRegistryDynamicSerializationSchema wrappedSer =
ProtobufConfluentFormatFactoryUtils.createDynamicSerializationSchema(
consumedDataType,
schemaRegistryClient,
schemaRegistryClientProvider,
schemaRegistryURL,
formatOptions);
return new ProtobufConfluentDebeziumSerializationSchema(wrappedSer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.formats.protobuf.PbFormatConfig;
import org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter;
import org.apache.flink.protobuf.registry.confluent.ProtobufConfluentDeserializationSchema;
import org.apache.flink.protobuf.registry.confluent.SchemaRegistryClientProvider;
import org.apache.flink.protobuf.registry.confluent.dynamic.ProtoCompiler;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -55,12 +56,13 @@ public class ProtoRegistryDynamicDeserializationSchema
private RowType rowType;
private final TypeInformation<RowData> resultTypeInfo;

private final SchemaRegistryClient schemaRegistryClient;
private final Map<Integer, KafkaProtobufDeserializer> kafkaProtobufDeserializers;
private final boolean ignoreParseErrors;
private final boolean readDefaultValues;
private final String schemaRegistryUrl;
private final SchemaRegistryClientProvider schemaRegistryClientProvider;

private transient SchemaRegistryClient schemaRegistryClient;
// Since these services operate on dynamically compiled and loaded classes, we need to
// assume that the new worker don't have the classes loaded yet.
private transient Map<Integer, ProtoToRowConverter> protoToRowConverters;
Expand All @@ -70,7 +72,7 @@ public class ProtoRegistryDynamicDeserializationSchema
private static final String FAKE_TOPIC = "fake_topic";

public ProtoRegistryDynamicDeserializationSchema(
SchemaRegistryClient schemaRegistryClient,
SchemaRegistryClientProvider schemaRegistryClientProvider,
String schemaRegistryUrl,
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
Expand All @@ -80,7 +82,7 @@ public ProtoRegistryDynamicDeserializationSchema(
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.readDefaultValues = readDefaultValues;
this.schemaRegistryClient = schemaRegistryClient;
this.schemaRegistryClientProvider = schemaRegistryClientProvider;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProtobufDeserializers = new HashMap<>();
}
Expand Down Expand Up @@ -119,6 +121,7 @@ public TypeInformation<RowData> getProducedType() {

@Override
public void open(InitializationContext context) throws Exception {
schemaRegistryClient = schemaRegistryClientProvider.createSchemaRegistryClient();
protoToRowConverters = new HashMap<>();
protoCompiler = new ProtoCompiler();
generatedMessageClasses = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.formats.protobuf.serialize.MessageSerializer;
import org.apache.flink.formats.protobuf.serialize.RowToProtoConverter;
import org.apache.flink.protobuf.registry.confluent.ProtobufConfluentSerializationSchema;
import org.apache.flink.protobuf.registry.confluent.SchemaRegistryClientProvider;
import org.apache.flink.protobuf.registry.confluent.dynamic.ProtoCompiler;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -48,23 +49,24 @@ public class ProtoRegistryDynamicSerializationSchema
private final String generatedClassName;
private RowType rowType;
private final String subjectName;
private final SchemaRegistryClient schemaRegistryClient;
private final String schemaRegistryUrl;
private final SchemaRegistryClientProvider schemaRegistryClientProvider;

private transient SchemaRegistryClient schemaRegistryClient;
private transient RowToProtoConverter rowToProtoConverter;

public ProtoRegistryDynamicSerializationSchema(
String generatedPackageName,
String generatedClassName,
RowType rowType,
String subjectName,
SchemaRegistryClient schemaRegistryClient,
SchemaRegistryClientProvider schemaRegistryClientProvider,
String schemaRegistryUrl) {
this.generatedPackageName = generatedPackageName;
this.generatedClassName = generatedClassName;
this.rowType = rowType;
this.subjectName = subjectName;
this.schemaRegistryClient = schemaRegistryClient;
this.schemaRegistryClientProvider = schemaRegistryClientProvider;
this.schemaRegistryUrl = schemaRegistryUrl;
}

Expand All @@ -79,6 +81,7 @@ public byte[] serialize(RowData element) {

@Override
public void open(InitializationContext context) throws Exception {
schemaRegistryClient = schemaRegistryClientProvider.createSchemaRegistryClient();
Class generatedClass = generateProtoClassForRowType();
KafkaProtobufSerializer kafkaProtobufSerializer = createKafkaSerializer();
MessageSerializer messageSerializer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.protobuf.registry.confluent.debezium;

import org.apache.flink.protobuf.registry.confluent.SchemaRegistryClientProviders;
import org.apache.flink.protobuf.registry.confluent.TestUtils;
import org.apache.flink.protobuf.registry.confluent.dynamic.deserializer.ProtoRegistryDynamicDeserializationSchema;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void setup() throws Exception {
new RowType.RowField(BOOL_FIELD, new BooleanType()));
ProtoRegistryDynamicDeserializationSchema wrappedDeser =
new ProtoRegistryDynamicDeserializationSchema(
mockSchemaRegistryClient,
new SchemaRegistryClientProviders.MockSchemaRegistryClientProvider(
mockSchemaRegistryClient),
DUMMY_SCHEMA_REGISTRY_URL,
rowType,
null,
Expand Down
Loading

0 comments on commit 4044a8d

Please sign in to comment.