Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
add Schema cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jianyun8023 committed Mar 18, 2021
1 parent 97604fd commit 9169ef5
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class DynamicPulsarSerializationSchema

private String valueFormatType;

private volatile Schema<RowData> schema;

DynamicPulsarSerializationSchema(
@Nullable SerializationSchema<RowData> keySerialization,
SerializationSchema<RowData> valueSerialization,
Expand Down Expand Up @@ -193,6 +195,17 @@ public void setNumParallelInstances(int numParallelInstances) {

@Override
public Schema<RowData> getSchema() {
if (schema == null) {
synchronized (this) {
if (schema == null) {
schema = buildSchema();
}
}
}
return schema;
}

private FlinkSchema<RowData> buildSchema() {
if (StringUtils.isBlank(valueFormatType)) {
return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), valueSerialization, null);
}
Expand All @@ -206,7 +219,7 @@ private void hackPbSerializationSchema(Map<String, String> options) {
// reflect read PbRowSerializationSchema#messageClassName
if (valueSerialization instanceof PbRowDataSerializationSchema) {
try {
String messageClassName =
final String messageClassName =
(String) FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true);
options.put(PbFormatOptions.MESSAGE_CLASS_NAME.key(), messageClassName);
} catch (IllegalAccessException e) {
Expand Down

0 comments on commit 9169ef5

Please sign in to comment.