Skip to content

Commit

Permalink
[optimize](sink) Optimize the BE load balancing logic during concurre…
Browse files Browse the repository at this point in the history
…nt imports. (apache#388)

(cherry picked from commit d9f52e6)
  • Loading branch information
vinlee19 authored and PeatBoy committed Jan 21, 2025
1 parent aeea124 commit 4a88412
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,15 @@ public static BackendUtil getInstance(
}

public String getAvailableBackend() {
return getAvailableBackend(0);
}

public String getAvailableBackend(int subtaskId) {
long tmp = pos + backends.size();
while (pos < tmp) {
BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % backends.size()));
BackendV2.BackendRowV2 backend =
backends.get((int) ((pos + subtaskId) % backends.size()));
pos++;
String res = backend.toBackendString();
if (tryHttpConnection(res)) {
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public List<DorisWriterState> snapshotState(long checkpointId) throws IOExceptio
List<DorisWriterState> writerStates = new ArrayList<>();
for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) {
// Dynamic refresh backend
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
DorisWriterState writerState =
new DorisWriterState(
labelPrefix,
Expand Down Expand Up @@ -340,7 +340,7 @@ private DorisStreamLoad getStreamLoader(String tableKey) {
tableKey,
v ->
new DorisStreamLoad(
backendUtil.getAvailableBackend(),
backendUtil.getAvailableBackend(subtaskId),
dorisOptions,
executionOptions,
labelGenerator,
Expand Down Expand Up @@ -373,7 +373,7 @@ private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoa
// use send cached data to new txn, then notify to restart the stream
if (executionOptions.isUseCache()) {
try {
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
if (executionOptions.enabled2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand All @@ -45,8 +44,7 @@ public void testGetAvailableBackend() throws Exception {

@Test
public void testTryHttpConnection() {
BackendUtil backendUtil = new BackendUtil(new ArrayList<>());
boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040");
boolean flag = BackendUtil.tryHttpConnection("127.0.0.1:8040");
Assert.assertFalse(flag);
}

Expand Down

0 comments on commit 4a88412

Please sign in to comment.