Skip to content

Commit

Permalink
[Fix][Hive] Writing parquet files supports the optional timestamp int96
Browse files Browse the repository at this point in the history
  • Loading branch information
corgy-w committed Jan 13, 2025
1 parent 5f99be2 commit daf1e65
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
Expand All @@ -35,8 +32,6 @@
import lombok.NonNull;

import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;

public class CsvWriteStrategy extends TextWriteStrategy {

Expand Down Expand Up @@ -69,48 +64,13 @@ public void addQuotesToCsvFields(SeaTunnelRow seaTunnelRow) {
private Object addQuotes(Object fieldValue, SeaTunnelDataType<?> fieldType) {
if (fieldType instanceof BasicType) {
if (fieldType.getSqlType() == SqlType.STRING) {
return addQuotesUsingCSVFormat(fieldValue.toString());
return addQuotesUsingCsvFormat(fieldValue.toString());
}
} else if (fieldType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) fieldType;
SeaTunnelDataType<?> elementType = arrayType.getElementType();
if (fieldValue instanceof Object[]) {
Object[] values = (Object[]) fieldValue;
Object[] newValues = new Object[values.length];
for (int i = 0; i < values.length; i++) {
newValues[i] = addQuotes(values[i], elementType);
}
return newValues;
}
} else if (fieldType instanceof MapType) {
MapType mapType = (MapType) fieldType;
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
if (fieldValue instanceof Map) {
Map<?, ?> values = (Map<?, ?>) fieldValue;
Map<Object, Object> newValues = new HashMap<>();
for (Map.Entry<?, ?> entry : values.entrySet()) {
Object newKey = addQuotes(entry.getKey(), keyType);
Object newValue = addQuotes(entry.getValue(), valueType);
newValues.put(newKey, newValue);
}
return newValues;
}
} else if (fieldType instanceof SeaTunnelRowType) {
SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
SeaTunnelRow row = (SeaTunnelRow) fieldValue;
String[] fieldNames = rowType.getFieldNames();
for (int i = 0; i < fieldNames.length; i++) {
SeaTunnelDataType<?> subFieldType = rowType.getFieldType(i);
Object subFieldValue = row.getField(i);
row.setField(i, addQuotes(subFieldValue, subFieldType));
}
return row;
}
return fieldValue;
}

private String addQuotesUsingCSVFormat(String fieldValue) {
private String addQuotesUsingCsvFormat(String fieldValue) {
CSVFormat format =
CSVFormat.DEFAULT
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.CsvWriteStrategy;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -41,7 +42,6 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class CsvWriteStrategyTest {

Expand Down Expand Up @@ -135,8 +135,9 @@ public void addQuotesToCsvFields_BasicNonStringField_NoQuotes() {
csvWriteStrategy.addQuotesToCsvFields(row);
assertEquals("\"Hi World\"", row.getField(0));
assertEquals("\"This is a \"\"quoted\"\" string\"", row.getField(1));
assertNull(((HashMap<?, ?>) row.getField(2)).get("f1"));
assertNull(((HashMap<?, ?>) row.getField(2)).get("f2"));
assertNull(((HashMap<?, ?>) row.getField(2)).get("f3"));
// map structure strings temporarily use raw escaping
Assertions.assertNotNull(((HashMap<?, ?>) row.getField(2)).get("f1"));
Assertions.assertNotNull(((HashMap<?, ?>) row.getField(2)).get("f2"));
Assertions.assertNotNull(((HashMap<?, ?>) row.getField(2)).get("f3"));
}
}

0 comments on commit daf1e65

Please sign in to comment.