From 05ae7730c7499e569206cf093a51a4060c5e0057 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 4 Jan 2024 18:22:02 +0800 Subject: [PATCH 1/4] [Improve] Improve doris sink to random use be --- .../connectors/doris/sink/writer/DorisSinkWriter.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 0abdc6269c1..ed4aca49add 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -76,7 +76,6 @@ public class DorisSinkWriter private transient Thread executorThread; private transient volatile Exception loadException = null; private List backends; - private long pos; public DorisSinkWriter( SinkWriter.Context context, @@ -255,12 +254,10 @@ public void close() throws IOException { @VisibleForTesting public String getAvailableBackend() { - long tmp = pos + backends.size(); - while (pos < tmp) { - BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size())); + Collections.shuffle(backends); + for (BackendV2.BackendRowV2 backend : backends) { String res = backend.toBackendString(); if (tryHttpConnection(res)) { - pos++; return res; } } @@ -279,7 +276,6 @@ public boolean tryHttpConnection(String backend) { return true; } catch (Exception ex) { log.warn("Failed to connect to backend:{}", backend, ex); - pos++; return false; } } From 9368c7e8a7fde32c7e933df404fc6d19b1de98d0 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 4 Jan 2024 19:58:05 +0800 Subject: [PATCH 2/4] update --- .../doris/sink/writer/DorisSinkWriter.java | 7 +++--- .../doris/sink/writer/DorisStreamLoad.java | 23 +++++++++++++------ 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index ed4aca49add..598d3969330 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -37,7 +37,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -155,7 +154,7 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() throws IOException { RespContent respContent = flush(); - if (!dorisConfig.getEnable2PC()) { + if (!dorisConfig.getEnable2PC() || respContent == null) { return Optional.empty(); } long txnId = respContent.getTxnId(); @@ -164,12 +163,12 @@ public Optional prepareCommit() throws IOException { new DorisCommitInfo(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); } - @NonNull private RespContent flush() throws IOException { + private RespContent flush() throws IOException { // disable exception checker before stop load. loading = false; checkState(dorisStreamLoad != null); RespContent respContent = dorisStreamLoad.stopLoad(); - if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + if (respContent != null && !DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { String errMsg = String.format( "stream load error: %s, see more in %s", diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index 00f21e9ae57..9ec98469996 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -78,7 +78,8 @@ public class DorisStreamLoad implements Serializable { private Future pendingLoadFuture; private final CloseableHttpClient httpClient; private final ExecutorService executorService; - private boolean loadBatchFirstRecord; + private volatile boolean loadBatchFirstRecord; + private String label; private long recordCount = 0; public DorisStreamLoad( @@ -191,6 +192,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { public void writeRecord(byte[] record) throws IOException { if (loadBatchFirstRecord) { loadBatchFirstRecord = false; + startStreamLoad(); } else { recordStream.write(lineDelimiter); } @@ -216,19 +218,26 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw public RespContent stopLoad() throws IOException { recordStream.endInput(); log.info("stream load stopped."); - checkState(pendingLoadFuture != null); - try { - return handlePreCommitResponse(pendingLoadFuture.get()); - } catch (Exception e) { - throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); + if (pendingLoadFuture != null) { + try { + return handlePreCommitResponse(pendingLoadFuture.get()); + } catch (Exception e) { + throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); + } + } else { + return null; } } public void startLoad(String label) throws IOException { loadBatchFirstRecord = true; recordCount = 0; - HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(); + this.label = label; + } + + private void startStreamLoad() { + HttpPutBuilder putBuilder = new HttpPutBuilder(); log.info("stream load started for {}", label); try { InputStreamEntity entity = new InputStreamEntity(recordStream); From 2d755e2c44012d1772f905690bcf986c23f9802a Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 5 Jan 2024 17:43:07 +0800 Subject: [PATCH 3/4] update --- .../connectors/doris/sink/writer/DorisStreamLoad.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index 9ec98469996..c2a8fc3bb5c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -192,6 +192,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { public void writeRecord(byte[] record) throws IOException { if (loadBatchFirstRecord) { loadBatchFirstRecord = false; + recordStream.startInput(); startStreamLoad(); } else { recordStream.write(lineDelimiter); @@ -216,13 +217,15 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw } public RespContent stopLoad() throws IOException { - recordStream.endInput(); - log.info("stream load stopped."); if (pendingLoadFuture != null) { + log.info("stream load stopped."); + recordStream.endInput(); try { return handlePreCommitResponse(pendingLoadFuture.get()); } catch (Exception e) { throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); + } finally { + pendingLoadFuture = null; } } else { return null; @@ -232,7 +235,6 @@ public RespContent stopLoad() throws IOException { public void startLoad(String label) throws IOException { loadBatchFirstRecord = true; recordCount = 0; - recordStream.startInput(); this.label = label; } From 4a1db639e7d4e064139ef90dac7de4503bafd04f Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 8 Jan 2024 10:09:16 +0800 Subject: [PATCH 4/4] update --- .../connectors/doris/sink/writer/DorisSinkWriter.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 598d3969330..40e0bc3a2f5 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -35,7 +35,6 @@ import org.apache.seatunnel.connectors.doris.util.HttpUtil; import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; @@ -251,8 +250,7 @@ public void close() throws IOException { } } - @VisibleForTesting - public String getAvailableBackend() { + private String getAvailableBackend() { Collections.shuffle(backends); for (BackendV2.BackendRowV2 backend : backends) { String res = backend.toBackendString();