Skip to content

Commit

Permalink
[Improve] Add batch flush in doris sink (apache#6024)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Dec 23, 2023
1 parent 04234ac commit 2c5b48e
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 360 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Version Supported
| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. |
| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. |
| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. |
| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. |

## Data Type Mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class DorisConfig implements Serializable {
private String password;
private Integer queryPort;
private String tableIdentifier;
private int batchSize;

// source option
private String readField;
Expand All @@ -76,7 +77,6 @@ public class DorisConfig implements Serializable {
private Integer requestRetries;
private boolean deserializeArrowAsync;
private int deserializeQueueSize;
private int batchSize;
private int execMemLimit;
private boolean useOldApi;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public interface DorisOptions {
.stringType()
.noDefaultValue()
.withDescription("the doris password.");
Option<Integer> DORIS_BATCH_SIZE =
Options.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("the batch size of the doris read/write.");

// source config options
Option<String> DORIS_READ_FIELD =
Expand Down Expand Up @@ -139,22 +144,6 @@ public interface DorisOptions {
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
Option<Integer> DORIS_BATCH_SIZE =
Options.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");
Option<Long> DORIS_EXEC_MEM_LIMIT =
Options.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
Option<Boolean> SOURCE_USE_OLD_API =
Options.key("source.use-old-api")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to read data using the new interface defined according to the FLIP-27 specification,default false");

// sink config options
Option<Boolean> SINK_ENABLE_2PC =
Expand Down Expand Up @@ -224,7 +213,9 @@ public interface DorisOptions {
.withDescription("Create table statement template, used to create Doris table");

OptionRule.Builder SINK_RULE =
OptionRule.builder().required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER);
OptionRule.builder()
.required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER)
.optional(DORIS_BATCH_SIZE);

OptionRule.Builder CATALOG_RULE =
OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig dorisConfig
HttpGet httpGet = new HttpGet(beUrl);
String response = send(dorisConfig, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
return backends;
return parseBackendV2(response, logger);
} catch (DorisConnectorException e) {
logger.info(
"Doris FE node {} is unavailable: {}, Request the next Doris FE node",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,40 @@

package org.apache.seatunnel.connectors.doris.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

@AutoService(SeaTunnelSink.class)
public class DorisSink
implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo>,
SupportSaveMode {

private DorisConfig dorisConfig;
private SeaTunnelRowType seaTunnelRowType;
private final DorisConfig dorisConfig;
private final SeaTunnelRowType seaTunnelRowType;
private String jobId;

private CatalogTable catalogTable;

public DorisSink() {}

public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
this.dorisConfig = DorisConfig.of(config);
this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

Expand All @@ -78,63 +59,22 @@ public String getPluginName() {
return "Doris";
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.dorisConfig = DorisConfig.of(pluginConfig);
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
DorisOptions.FENODES.key(),
DorisOptions.USERNAME.key(),
DorisOptions.TABLE_IDENTIFIER.key());
if (!result.isSuccess()) {
throw new DorisConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
if (dorisConfig.getTableIdentifier().isEmpty() && catalogTable != null) {
String tableIdentifier =
catalogTable.getTableId().getDatabaseName()
+ "."
+ catalogTable.getTableId().getTableName();
dorisConfig.setTableIdentifier(tableIdentifier);
}
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
}

@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> createWriter(
SinkWriter.Context context) throws IOException {
DorisSinkWriter dorisSinkWriter =
new DorisSinkWriter(
context, Collections.emptyList(), seaTunnelRowType, dorisConfig, jobId);
dorisSinkWriter.initializeLoad(Collections.emptyList());
return dorisSinkWriter;
return new DorisSinkWriter(
context, Collections.emptyList(), seaTunnelRowType, dorisConfig, jobId);
}

@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter(
SinkWriter.Context context, List<DorisSinkState> states) throws IOException {
DorisSinkWriter dorisWriter =
new DorisSinkWriter(context, states, seaTunnelRowType, dorisConfig, jobId);
dorisWriter.initializeLoad(states);
return dorisWriter;
return new DorisSinkWriter(context, states, seaTunnelRowType, dorisConfig, jobId);
}

@Override
Expand All @@ -152,17 +92,6 @@ public Optional<Serializer<DorisCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DorisCommitInfoSerializer());
}

@Override
public Optional<SinkAggregatedCommitter<DorisCommitInfo, DorisCommitInfo>>
createAggregatedCommitter() throws IOException {
return Optional.empty();
}

@Override
public Optional<Serializer<DorisCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.empty();
}

@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.empty();
Expand Down
Loading

0 comments on commit 2c5b48e

Please sign in to comment.