Skip to content

Commit

Permalink
[Fix][Connecotr-V2] Fix paimon dynamic bucket tale in primary key is …
Browse files Browse the repository at this point in the history
…not first (apache#7728)
  • Loading branch information
hawk9821 authored Sep 27, 2024
1 parent 41b8530 commit dc7f695
Show file tree
Hide file tree
Showing 9 changed files with 567 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

import org.apache.paimon.catalog.Identifier;
Expand Down Expand Up @@ -161,6 +163,8 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
throw new TableAlreadyExistException(this.catalogName, tablePath);
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) {
throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName());
} catch (Exception e) {
resolveException(e);
}
}

Expand Down Expand Up @@ -273,4 +277,24 @@ private CatalogTable toCatalogTable(
private Identifier toIdentifier(TablePath tablePath) {
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
}

private void resolveException(Exception e) {
Throwable cause = e.getCause();
if (cause instanceof UnsupportedOperationException) {
String message = cause.getMessage();
if (message.contains("The type ")
&& message.contains(" in primary key field ")
&& message.contains(" is unsupported")) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.UNSUPPORTED_PRIMARY_DATATYPE, message);
}
} else if (cause instanceof RuntimeException) {
String message = cause.getMessage();
if (message.contains("Cannot define 'bucket-key' in unaware or dynamic bucket mode.")) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.WRITE_PROPS_BUCKET_KEY_ERROR, message);
}
}
throw new CatalogException("An unexpected error occurred", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.apache.seatunnel.api.sink.SchemaSaveMode;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Getter
@Slf4j
public class PaimonSinkConfig extends PaimonConfig {
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
Expand Down Expand Up @@ -77,5 +79,12 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), ",");
this.writeProps = readonlyConfig.get(WRITE_PROPS);
checkConfig();
}

private void checkConfig() {
if (this.primaryKeys.isEmpty() && "-1".equals(this.writeProps.get("bucket"))) {
log.warn("Append only table currently do not support dynamic bucket");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode {
GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"),
AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"),
LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
GET_FILED_FAILED("PAIMON-07", "Get field failed");
GET_FILED_FAILED("PAIMON-07", "Get field failed"),
UNSUPPORTED_PRIMARY_DATATYPE("PAIMON-08", "Paimon primary key datatype is unsupported"),
WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in dynamic bucket mode");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public PaimonSinkWriter(
BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
this.dynamicBucket =
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode;
int bucket = ((FileStoreTable) table).coreOptions().bucket();
if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
log.warn("Append only table currently do not support dynamic bucket");
}
if (dynamicBucket) {
this.bucketAssigner =
new PaimonBucketAssigner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;

import org.apache.commons.collections.CollectionUtils;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.SimpleHashBucketAssigner;
import org.apache.paimon.reader.RecordReader;
Expand All @@ -29,15 +27,20 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class PaimonBucketAssigner {

private final RowPartitionKeyExtractor extractor;

private final Projection bucketKeyProjection;

private final SimpleHashBucketAssigner simpleHashBucketAssigner;

private final TableSchema schema;
Expand All @@ -46,10 +49,6 @@ public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
this.schema = fileStoreTable.schema();
this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
this.bucketKeyProjection =
CodeGenUtils.newProjection(
fileStoreTable.schema().logicalRowType(),
fileStoreTable.schema().projection(fileStoreTable.schema().bucketKeys()));
long dynamicBucketTargetRowNum =
((FileStoreTable) table).coreOptions().dynamicBucketTargetRowNum();
this.simpleHashBucketAssigner =
Expand All @@ -59,26 +58,35 @@ public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {

private void loadBucketIndex(FileStoreTable fileStoreTable, int numAssigners, int assignId) {
IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
List<String> fieldNames = schema.fieldNames();
Map<String, Integer> fieldIndexMap =
IntStream.range(0, fieldNames.size())
.boxed()
.collect(Collectors.toMap(fieldNames::get, Function.identity()));
List<DataField> primaryKeys = schema.primaryKeysFields();
try (RecordReader<InternalRow> recordReader =
indexBootstrap.bootstrap(numAssigners, assignId)) {
RecordReaderIterator<InternalRow> readerIterator =
new RecordReaderIterator<>(recordReader);
while (readerIterator.hasNext()) {
InternalRow row = readerIterator.next();
assign(row);
GenericRow binaryRow = new GenericRow(fieldNames.size());
for (int i = 0; i < primaryKeys.size(); i++) {
String name = primaryKeys.get(i).name();
DataType type = primaryKeys.get(i).type();
binaryRow.setField(
fieldIndexMap.get(name),
InternalRow.createFieldGetter(type, i).getFieldOrNull(row));
}
assign(binaryRow);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public int assign(InternalRow rowData) {
int hash;
if (CollectionUtils.isEmpty(this.schema.bucketKeys())) {
hash = extractor.trimmedPrimaryKey(rowData).hashCode();
} else {
hash = bucketKeyProjection.apply(rowData).hashCode();
}
int hash = extractor.trimmedPrimaryKey(rowData).hashCode();
return Math.abs(
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
}
Expand Down
Loading

0 comments on commit dc7f695

Please sign in to comment.