diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 0fcc01d70..fcf31c34f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -38,6 +38,7 @@ import org.apache.doris.flink.rest.models.QueryPlan; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.rest.models.Tablet; +import org.apache.doris.flink.sink.BackendUtil; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; @@ -280,7 +281,13 @@ public static String randomEndpoint(String feNodes, Logger logger) } List nodes = Arrays.asList(feNodes.split(",")); Collections.shuffle(nodes); - return nodes.get(0).trim(); + for (String feNode : nodes) { + if (BackendUtil.tryHttpConnection(feNode)) { + return feNode; + } + } + throw new DorisRuntimeException( + "No Doris FE is available, please check configuration or cluster status."); } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index f909bb6bc..9a45ff0d5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -87,17 +87,18 @@ public String getAvailableBackend() { throw new DorisRuntimeException("no available backend."); } - public static boolean tryHttpConnection(String backend) { + public static boolean tryHttpConnection(String host) { try { - backend = "http://" + backend; - URL url = new URL(backend); + LOG.info("try to connect host {}", host); + host = "http://" + host; + URL url = new URL(host); HttpURLConnection co = (HttpURLConnection) url.openConnection(); co.setConnectTimeout(60000); co.connect(); co.disconnect(); return true; } catch (Exception ex) { - LOG.warn("Failed to connect to backend:{}", backend, ex); + LOG.warn("Failed to connect to host:{}", host, ex); return false; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java index 977f8da74..2622dcf53 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -20,6 +20,7 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.HttpEntityMock; import org.apache.doris.flink.sink.OptionUtils; import org.apache.http.ProtocolVersion; @@ -28,6 +29,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicStatusLine; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -70,7 +72,8 @@ public class SchemaManagerTest { HttpEntityMock entityMock; SchemaChangeManager schemaChangeManager; - static MockedStatic httpClientMockedStatic = mockStatic(HttpClients.class); + static MockedStatic httpClientMockedStatic; + static MockedStatic backendUtilMockedStatic; @Before public void setUp() throws IOException { @@ -89,7 +92,11 @@ public void setUp() throws IOException { when(httpResponse.getStatusLine()).thenReturn(normalLine); when(httpResponse.getEntity()).thenReturn(entityMock); + httpClientMockedStatic = mockStatic(HttpClients.class); httpClientMockedStatic.when(() -> HttpClients.createDefault()).thenReturn(httpClient); + + backendUtilMockedStatic = mockStatic(BackendUtil.class); + backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true); } @Test @@ -128,4 +135,10 @@ public void testRenameColumn() { Assert.assertEquals( "ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL); } + + @After + public void after() { + httpClientMockedStatic.close(); + backendUtilMockedStatic.close(); + } }