diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index 4a43c3a7..cc5515a7 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -104,7 +104,7 @@ services: - ZOO_MY_ID=1 druid_coordinator: - image: apache/druid:28.0.0 + image: apache/druid:29.0.0 container_name: druid_coordinator volumes: - druid_shared:/opt/shared @@ -120,7 +120,7 @@ services: - environment_druid druid_broker: - image: apache/druid:28.0.0 + image: apache/druid:29.0.0 container_name: druid_broker volumes: - broker_var:/opt/druid/var @@ -136,7 +136,7 @@ services: - environment_druid druid_historical: - image: apache/druid:28.0.0 + image: apache/druid:29.0.0 container_name: druid_historical volumes: - druid_shared:/opt/shared @@ -153,7 +153,7 @@ services: - environment_druid druid_middlemanager: - image: apache/druid:28.0.0 + image: apache/druid:29.0.0 container_name: druid_middlemanager volumes: - druid_shared:/opt/shared @@ -171,7 +171,7 @@ services: - environment_druid druid_router: - image: apache/druid:28.0.0 + image: apache/druid:29.0.0 container_name: druid_router volumes: - router_var:/opt/druid/var diff --git a/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTestHelper.java b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTestHelper.java index 9e366fdd..3e8a8bd3 100644 --- a/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTestHelper.java +++ b/plugin-jdbc-druid/src/test/java/io/kestra/plugin/jdbc/druid/DruidTestHelper.java @@ -2,13 +2,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.utils.Await; import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; -import java.util.concurrent.TimeUnit; +import java.time.Duration; +import java.util.concurrent.TimeoutException; final class DruidTestHelper { private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson(); @@ -16,12 +18,12 @@ final class DruidTestHelper { private DruidTestHelper() { } - static void initServer() throws IOException, InterruptedException { + static void initServer() throws IOException, InterruptedException, TimeoutException { String payload = """ { "context": {"waitUntilSegmentsLoad": true, "finalizeAggregations": false, "groupByEnableMultiValueUnnesting": false, "executionMode":"async", "maxNumTasks": 2}, "header": true, - "query": "REPLACE INTO \\"products\\" OVERWRITE ALL WITH \\"ext\\" AS ( SELECT * FROM TABLE(EXTERN('{\\"type\\":\\"http\\",\\"uris\\":[\\"https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/products/products-1000.csv\\"]}',\\n '{\\"type\\":\\"csv\\",\\"findColumnsFromHeader\\":true}'\\n )\\n ) EXTEND (\\"index\\" BIGINT, \\"name\\" VARCHAR, \\"ean\\" BIGINT)) SELECT TIMESTAMP '2000-01-01 00:00:00' AS \\"__time\\", \\"index\\", \\"name\\", \\"ean\\"FROM \\"ext\\" PARTITIONED BY ALL", + "query": "REPLACE INTO \\"products\\" OVERWRITE ALL WITH \\"ext\\" AS ( SELECT * FROM TABLE(EXTERN('{\\"type\\":\\"http\\",\\"uris\\":[\\"https://drive.google.com/uc?id=1OT84-j5J5z2tHoUvikJtoJFInWmlyYzY&export=download\\"]}',\\n '{\\"type\\":\\"csv\\",\\"findColumnsFromHeader\\":true}'\\n )\\n ) EXTEND (\\"index\\" BIGINT, \\"name\\" VARCHAR, \\"ean\\" BIGINT)) SELECT TIMESTAMP '2000-01-01 00:00:00' AS \\"__time\\", \\"index\\", \\"name\\", \\"ean\\"FROM \\"ext\\" PARTITIONED BY ALL", "resultFormat": "array", "sqlTypesHeader": true, "typesHeader": true @@ -37,17 +39,20 @@ static void initServer() throws IOException, InterruptedException { String queryId = json.get("queryId").asText(); // we need to wait until Druid has processed the request - String state = null; - while (!"SUCCESS".equals(state)) { - TimeUnit.SECONDS.sleep(5); - URI queryUri = URI.create("http://localhost:8888/druid/v2/sql/statements/"+ queryId); - var queryHttpRequest = HttpRequest.newBuilder(queryUri) + Await.until(() -> { + try { + URI queryUri = URI.create("http://localhost:8888/druid/v2/sql/statements/"+ queryId); + var queryHttpRequest = HttpRequest.newBuilder(queryUri) .header("Accept", "application/json") .GET() .build(); - var queryHttpResponse = httpClient.send(queryHttpRequest, HttpResponse.BodyHandlers.ofString()); - var queryJson = OBJECT_MAPPER.readTree(queryHttpResponse.body()); - state = queryJson.get("state").asText(); - } + var queryHttpResponse = httpClient.send(queryHttpRequest, HttpResponse.BodyHandlers.ofString()); + var queryJson = OBJECT_MAPPER.readTree(queryHttpResponse.body()); + String state = queryJson.get("state").asText(); + return "SUCCESS".equals(state); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, Duration.ofSeconds(1), Duration.ofMinutes(1)); } }