-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8841] Fix schema validating exception during flink async cluste… #12598
base: master
Are you sure you want to change the base?
Conversation
*/ | ||
private Schema reconcileSchemaWithNullability(ClusteringOperation clusteringOperation) { | ||
String instantTs = StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath()) | ||
? FSUtils.getCommitTime(clusteringOperation.getDeltaFilePaths().get(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the Flink clustering only work on append only table, all the data files should be in parquet format, we can fetch the record key fields, then the file schema from the parquet footer, and reconcile the record key fields if the nullability discrepency exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for where to get the file schema, actually, files with same commit time share the same write schema, and getTableAvroSchema in TableSchemaResolver
is more efficient since there is a cache, what do you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the TableSchemaResolver
would trigger file lising of the commit metadata which is a pressure to filesystem. Fetch the schema from file is straight-forward:
private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException {
Schema schema = null;
while (filePaths.hasNext() && schema == null) {
StoragePath filePath = new StoragePath(filePaths.next());
if (FSUtils.isLogFile(filePath)) {
// this is a log file
schema = readSchemaFromLogFile(filePath);
} else {
schema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
}
}
return schema;
}
Another way is to force make the record key fields nullable if they are not if a nullable schema can be used to read non-nullable schema data file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second way sounds more efficient, I'll verify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @danny0405 updated using the second way, and added a test to verify nullable schema can be used to read non-nullable schema data file
* @param nullable nullability of column type | ||
* @return a new schema with the nullabilities of the given columns updated | ||
*/ | ||
public static Schema createSchemaWithNullabilityUpdate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to forceNullableColumns
, we should also avoid to recreate the schema if the field is already nullable.
* @return schema that has nullability constraints reconciled | ||
*/ | ||
private Schema reconcileSchemaWithRecordKeyNullability(Schema schema) { | ||
Option<String[]> recordKeyOp = table.getMetaClient().getTableConfig().getRecordKeyFields(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default record key field is uuid
, we should check the validity first.
@@ -605,14 +605,16 @@ public static String createSchemaErrorString(String errorMessage, Schema writerS | |||
* @param nullable nullability of column type | |||
* @return a new schema with the nullabilities of the given columns updated | |||
*/ | |||
public static Schema createSchemaWithNullabilityUpdate( | |||
public static Schema forceNullableColumns( | |||
Schema schema, List<String> nullableUpdateCols, boolean nullable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullableUpdateCols -> columns. We can eliminate the flag nullalbe
because it is always true.
…ring
Change Logs
In Flink SQL, primary key constraint can be defined, and the type of pk field become non-null,while spark has no primary key constraint, so they have discrepancies in the schema of the underlying files, e.g., parquet.
we can make a schema reconciliation during the async clustering reading.
Impact
Consolidating flink clustering by reconcile schemas to tolerate different nullabilities of record key.
Risk level (write none, low medium or high below)
low
Documentation Update
none
Contributor's checklist