Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8841] Fix schema validating exception during flink async cluste… #12598

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

cshuo
Copy link
Contributor

@cshuo cshuo commented Jan 8, 2025

…ring

Change Logs

In Flink SQL, primary key constraint can be defined, and the type of pk field become non-null,while spark has no primary key constraint, so they have discrepancies in the schema of the underlying files, e.g., parquet.

we can make a schema reconciliation during the async clustering reading.

Impact

Consolidating flink clustering by reconcile schemas to tolerate different nullabilities of record key.

Risk level (write none, low medium or high below)

low

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

*/
private Schema reconcileSchemaWithNullability(ClusteringOperation clusteringOperation) {
String instantTs = StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath())
? FSUtils.getCommitTime(clusteringOperation.getDeltaFilePaths().get(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the Flink clustering only work on append only table, all the data files should be in parquet format, we can fetch the record key fields, then the file schema from the parquet footer, and reconcile the record key fields if the nullability discrepency exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for where to get the file schema, actually, files with same commit time share the same write schema, and getTableAvroSchema in TableSchemaResolver is more efficient since there is a cache, what do you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the TableSchemaResolver would trigger file lising of the commit metadata which is a pressure to filesystem. Fetch the schema from file is straight-forward:

private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException {
    Schema schema = null;
    while (filePaths.hasNext() && schema == null) {
      StoragePath filePath = new StoragePath(filePaths.next());
      if (FSUtils.isLogFile(filePath)) {
        // this is a log file
        schema = readSchemaFromLogFile(filePath);
      } else {
        schema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
            .getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
      }
    }
    return schema;
  }

Another way is to force make the record key fields nullable if they are not if a nullable schema can be used to read non-nullable schema data file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second way sounds more efficient, I'll verify it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @danny0405 updated using the second way, and added a test to verify nullable schema can be used to read non-nullable schema data file

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jan 8, 2025
* @param nullable nullability of column type
* @return a new schema with the nullabilities of the given columns updated
*/
public static Schema createSchemaWithNullabilityUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to forceNullableColumns, we should also avoid to recreate the schema if the field is already nullable.

* @return schema that has nullability constraints reconciled
*/
private Schema reconcileSchemaWithRecordKeyNullability(Schema schema) {
Option<String[]> recordKeyOp = table.getMetaClient().getTableConfig().getRecordKeyFields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default record key field is uuid, we should check the validity first.

@@ -605,14 +605,16 @@ public static String createSchemaErrorString(String errorMessage, Schema writerS
* @param nullable nullability of column type
* @return a new schema with the nullabilities of the given columns updated
*/
public static Schema createSchemaWithNullabilityUpdate(
public static Schema forceNullableColumns(
Schema schema, List<String> nullableUpdateCols, boolean nullable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullableUpdateCols -> columns. We can eliminate the flag nullalbe because it is always true.

@hudi-bot
Copy link

hudi-bot commented Jan 9, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants