Skip to content

Commit

Permalink
[Improve][Connector-V2] MaxComputeSink support create partition in sa…
Browse files Browse the repository at this point in the history
…vemode (#8474)
  • Loading branch information
Hisoka-X authored Jan 12, 2025
1 parent d61cba2 commit 0b8f9de
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/Maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ You can use the following placeholders

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved
`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved. If the `partition_spec` is set, the partition will be deleted and rebuilt.
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved. If the `partition_spec` is set, the partition will be created.
`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist
`IGNORE` :Ignore the treatment of the table

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,27 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
}
}

public void createPartition(TablePath tablePath, PartitionSpec partitionSpec) {
try {
Odps odps = getOdps(tablePath.getDatabaseName());
Table odpsTable = odps.tables().get(tablePath.getTableName());
odpsTable.createPartition(partitionSpec, true);
} catch (Exception e) {
throw new CatalogException("create partition error", e);
}
}

public void truncatePartition(TablePath tablePath, PartitionSpec partitionSpec) {
try {
Odps odps = getOdps(tablePath.getDatabaseName());
Table odpsTable = odps.tables().get(tablePath.getTableName());
odpsTable.deletePartition(partitionSpec, true);
odpsTable.createPartition(partitionSpec, true);
} catch (Exception e) {
throw new CatalogException("create partition error", e);
}
}

@Override
public boolean isExistsData(TablePath tablePath) {
throw new UnsupportedOperationException();
Expand All @@ -280,7 +301,15 @@ public boolean isExistsData(TablePath tablePath) {
public void executeSql(TablePath tablePath, String sql) {
try {
Odps odps = getOdps(tablePath.getDatabaseName());
SQLTask.run(odps, sql).waitForSuccess();
String[] sqls = sql.split(";");
for (String s : sqls) {
if (!s.trim().isEmpty()) {
if (!s.trim().endsWith(";")) {
s = s.trim() + ";";
}
SQLTask.run(odps, s).waitForSuccess();
}
}
} catch (OdpsException e) {
throw new CatalogException("execute sql error", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;

import org.apache.commons.lang3.StringUtils;

import com.aliyun.odps.PartitionSpec;

import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;

public class MaxComputeSaveModeHandler extends DefaultSaveModeHandler {

private final ReadonlyConfig readonlyConfig;

public MaxComputeSaveModeHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable,
String customSql,
ReadonlyConfig readonlyConfig) {
super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
this.readonlyConfig = readonlyConfig;
}

@Override
protected void createSchemaWhenNotExist() {
super.createSchemaWhenNotExist();
if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
((MaxComputeCatalog) catalog)
.createPartition(
tablePath, new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
}
}

@Override
protected void recreateSchema() {
super.recreateSchema();
if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
((MaxComputeCatalog) catalog)
.createPartition(
tablePath, new PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
Expand Down Expand Up @@ -94,12 +93,13 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
}

return Optional.of(
new DefaultSaveModeHandler(
new MaxComputeSaveModeHandler(
readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
dataSaveMode,
catalog,
catalogTable,
readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL)));
readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL),
readonlyConfig));
}

@Override
Expand Down

0 comments on commit 0b8f9de

Please sign in to comment.