Skip to content

Commit

Permalink
[Feature][Paimon] Support timestamp(n) for the paimon sink
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Mar 20, 2024
1 parent 8f8a713 commit a7626ba
Show file tree
Hide file tree
Showing 19 changed files with 469 additions and 76 deletions.
1 change: 1 addition & 0 deletions docs/en/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ columns = [
| type | Yes | - | The data type of the column |
| nullable | No | true | If the column can be nullable |
| columnLength | No | 0 | The length of the column which will be useful when you need to define the length |
| columnScale | No | - | The scale of the column which will be useful when you need to define the scale |
| defaultValue | No | null | The default value of the column |
| comment | No | null | The comment of the column |

Expand Down
1 change: 1 addition & 0 deletions docs/zh/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ columns = [
| type | Yes | - | 列的数据类型 |
| nullable | No | true | 列是否可空 |
| columnLength | No | 0 | 列的长度,当您需要定义长度时将很有用 |
| columnScale | No | - | 列的精度,当您需要定义精度时将很有用 |
| defaultValue | No | null | 列的默认值 |
| comment | No | null | 列的注释 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public List<Column> parse(ReadonlyConfig schemaConfig) {
Integer columnLength =
columnConfig.get(
TableSchemaOptions.ColumnOptions.COLUMN_LENGTH);

Integer columnScale =
columnConfig.get(
TableSchemaOptions.ColumnOptions.COLUMN_SCALE);

Boolean nullable =
columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE);
Object defaultValue =
Expand All @@ -143,7 +148,8 @@ public List<Column> parse(ReadonlyConfig schemaConfig) {
return PhysicalColumn.of(
name,
seaTunnelDataType,
columnLength,
Long.valueOf(columnLength),
columnScale,
nullable,
defaultValue,
comment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public static class ColumnOptions {
.noDefaultValue()
.withDescription("SeaTunnel Schema Column Type");

public static final Option<Integer> COLUMN_SCALE =
Options.key("columnScale")
.intType()
.noDefaultValue()
.withDescription("SeaTunnel Schema Column scale");

public static final Option<Integer> COLUMN_LENGTH =
Options.key("columnLength")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

Expand Down Expand Up @@ -182,18 +181,8 @@ private CatalogTable toCatalogTable(
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
String name = dataField.name();
SeaTunnelDataType<?> seaTunnelType =
SchemaUtil.toSeaTunnelType(dataField.type());
PhysicalColumn physicalColumn =
PhysicalColumn.of(
name,
seaTunnelType,
(Long) null,
true,
null,
dataField.description());
builder.column(physicalColumn);
Column column = SchemaUtil.toSeaTunnelType(dataField.type());
builder.column(column);
});

List<String> partitionKeys = schema.partitionKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.data;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
Expand All @@ -40,11 +39,11 @@ public String identifier() {

@Override
public Column convert(DataType dataType) {
return PhysicalColumn.builder().dataType(RowTypeConverter.convert(dataType)).build();
return RowTypeConverter.convert(dataType);
}

@Override
public DataType reconvert(Column column) {
return RowTypeConverter.reconvert(column.getDataType());
return RowTypeConverter.reconvert(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
Expand Down Expand Up @@ -74,6 +76,8 @@ public class PaimonSinkWriter

private final JobContext jobContext;

private TableSchema tableSchema;

public PaimonSinkWriter(
Context context,
Table table,
Expand All @@ -88,6 +92,7 @@ public PaimonSinkWriter(
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.jobContext = jobContext;
this.tableSchema = ((FileStoreTable) table).schema();
}

public PaimonSinkWriter(
Expand Down Expand Up @@ -124,7 +129,7 @@ public PaimonSinkWriter(

@Override
public void write(SeaTunnelRow element) throws IOException {
InternalRow rowData = RowConverter.convert(element, seaTunnelRowType);
InternalRow rowData = RowConverter.reconvert(element, seaTunnelRowType, tableSchema);
try {
tableWrite.write(rowData);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** The converter for converting {@link InternalRow} and {@link SeaTunnelRow} */
Expand Down Expand Up @@ -129,7 +133,7 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType)
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
public static BinaryArray convert(Object array, SeaTunnelDataType<?> dataType) {
public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
Expand Down Expand Up @@ -327,10 +331,12 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
*
* @param seaTunnelRow SeaTunnel row object
* @param seaTunnelRowType SeaTunnel row type
* @param tableSchema Paimon table schema
* @return Paimon row object
*/
public static InternalRow convert(
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) {
public static InternalRow reconvert(
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
List<DataField> fields = tableSchema.fields();
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
Expand Down Expand Up @@ -390,8 +396,12 @@ public static InternalRow convert(
i, Timestamp.fromLocalDateTime(date.atTime(time)), 3);
break;
case TIMESTAMP:
String fieldName = seaTunnelRowType.getFieldName(i);
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
int precision = ((TimestampType) dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
binaryWriter.writeTimestamp(i, Timestamp.fromLocalDateTime(datetime), 9);
binaryWriter.writeTimestamp(
i, Timestamp.fromLocalDateTime(datetime), precision);
break;
case MAP:
MapType<?, ?> mapType = (MapType<?, ?>) seaTunnelRowType.getFieldType(i);
Expand All @@ -404,13 +414,14 @@ public static InternalRow convert(
Object[] values = field.values().toArray(new Object[0]);
binaryWriter.writeMap(
i,
BinaryMap.valueOf(convert(keys, keyType), convert(values, valueType)),
BinaryMap.valueOf(
reconvert(keys, keyType), reconvert(values, valueType)),
new InternalMapSerializer(paimonKeyType, paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
convert(seaTunnelRow.getField(i), arrayType.getElementType());
reconvert(seaTunnelRow.getField(i), arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
Expand All @@ -420,8 +431,10 @@ public static InternalRow convert(
case ROW:
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Object row = seaTunnelRow.getField(i);
InternalRow paimonRow = convert((SeaTunnelRow) row, (SeaTunnelRowType) rowType);
RowType paimonRowType = RowTypeConverter.reconvert((SeaTunnelRowType) rowType);
InternalRow paimonRow =
reconvert((SeaTunnelRow) row, (SeaTunnelRowType) rowType, tableSchema);
RowType paimonRowType =
RowTypeConverter.reconvert((SeaTunnelRowType) rowType, tableSchema);
binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType));
break;
default:
Expand Down
Loading

0 comments on commit a7626ba

Please sign in to comment.