Skip to content

Commit

Permalink
[Bug] [Maxcompute] Fix failed to parse some maxcompute type (#3894)
Browse files Browse the repository at this point in the history
  • Loading branch information
stdnt-xiao authored Mar 4, 2023
1 parent b7843c7 commit 642901f
Show file tree
Hide file tree
Showing 13 changed files with 644 additions and 227 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- [ClickHouse] Fix clickhouse write cdc changelog update event #3951
- [ClickHouse] Fix connector source snapshot state NPE #4027
- [Kudu] Fix connector source snapshot state NPE #4027
- [Maxcompute] Fix some data type parse fail #3894

### Zeta Engine
- [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.aliyun.odps.type;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;

import com.aliyun.odps.OdpsType;

public class SimpleArrayTypeInfo implements ArrayTypeInfo {
private final TypeInfo valueType;

SimpleArrayTypeInfo(TypeInfo typeInfo) {
if (typeInfo == null) {
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid element type.");
} else {
this.valueType = typeInfo;
}
}

public String getTypeName() {
return this.getOdpsType().name() + "<" + this.valueType.getTypeName() + ">";
}

public TypeInfo getElementTypeInfo() {
return this.valueType;
}

public OdpsType getOdpsType() {
return OdpsType.ARRAY;
}

public String toString() {
return this.getTypeName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.aliyun.odps.type;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;

import com.aliyun.odps.OdpsType;

public class SimpleMapTypeInfo implements MapTypeInfo {
private final TypeInfo keyType;
private final TypeInfo valueType;

SimpleMapTypeInfo(TypeInfo keyType, TypeInfo valueType) {
if (keyType != null && valueType != null) {
this.keyType = keyType;
this.valueType = valueType;
} else {
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid key or value type for map.");
}
}

public String getTypeName() {
return this.getOdpsType().name()
+ "<"
+ this.keyType.getTypeName()
+ ","
+ this.valueType.getTypeName()
+ ">";
}

public TypeInfo getKeyTypeInfo() {
return this.keyType;
}

public TypeInfo getValueTypeInfo() {
return this.valueType;
}

public OdpsType getOdpsType() {
return OdpsType.MAP;
}

public String toString() {
return this.getTypeName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.aliyun.odps.type;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;

import com.aliyun.odps.OdpsType;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class SimpleStructTypeInfo implements StructTypeInfo {
private final List<String> fieldNames;
private final List<TypeInfo> fieldTypeInfos;

SimpleStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
this.validateParameters(names, typeInfos);
this.fieldNames = this.toLowerCase(names);
this.fieldTypeInfos = new ArrayList(typeInfos);
}

private List<String> toLowerCase(List<String> names) {
List<String> lowerNames = new ArrayList(names.size());
Iterator var3 = names.iterator();

while (var3.hasNext()) {
String name = (String) var3.next();
lowerNames.add(name.toLowerCase());
}

return lowerNames;
}

private void validateParameters(List<String> names, List<TypeInfo> typeInfos) {
if (names != null && typeInfos != null && !names.isEmpty() && !typeInfos.isEmpty()) {
if (names.size() != typeInfos.size()) {
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"The amount of field names must be equal to the amount of field types.");
}
} else {
throw new MaxcomputeConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Invalid name or element type for struct.");
}
}

public String getTypeName() {
StringBuilder stringBuilder = new StringBuilder(this.getOdpsType().name());
stringBuilder.append("<");

for (int i = 0; i < this.fieldNames.size(); ++i) {
if (i > 0) {
stringBuilder.append(",");
}

stringBuilder.append((String) this.fieldNames.get(i));
stringBuilder.append(":");
stringBuilder.append(((TypeInfo) this.fieldTypeInfos.get(i)).getTypeName());
}

stringBuilder.append(">");
return stringBuilder.toString();
}

public List<String> getFieldNames() {
return this.fieldNames;
}

public List<TypeInfo> getFieldTypeInfos() {
return this.fieldTypeInfos;
}

public int getFieldCount() {
return this.fieldNames.size();
}

public OdpsType getOdpsType() {
return OdpsType.STRUCT;
}

public String toString() {
return this.getTypeName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new MaxcomputeWriter(this.typeInfo, this.pluginConfig);
return new MaxcomputeWriter(this.pluginConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
Expand All @@ -43,23 +42,18 @@

@Slf4j
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final RecordWriter recordWriter;
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;

private Config pluginConfig;

public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) {
this.seaTunnelRowType = seaTunnelRowType;
this.pluginConfig = pluginConfig;
public MaxcomputeWriter(Config pluginConfig) {
try {
Table table = MaxcomputeUtil.getTable(pluginConfig);
this.tableSchema = table.getSchema();
TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
if (this.pluginConfig.hasPath(PARTITION_SPEC.key())) {
if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
PartitionSpec partitionSpec =
new PartitionSpec(this.pluginConfig.getString(PARTITION_SPEC.key()));
new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
session =
tunnel.createUploadSession(
pluginConfig.getString(PROJECT.key()),
Expand All @@ -80,8 +74,7 @@ public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig)

@Override
public void write(SeaTunnelRow seaTunnelRow) throws IOException {
Record record =
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.seaTunnelRowType);
Record record = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.tableSchema);
recordWriter.write(record);
}

Expand Down
Loading

0 comments on commit 642901f

Please sign in to comment.