Skip to content

Commit

Permalink
fix: Apache Druid tests
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Feb 28, 2024
1 parent 1824f9c commit e046785
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
10 changes: 5 additions & 5 deletions docker-compose-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ services:
- ZOO_MY_ID=1

druid_coordinator:
image: apache/druid:28.0.0
image: apache/druid:29.0.0
container_name: druid_coordinator
volumes:
- druid_shared:/opt/shared
Expand All @@ -120,7 +120,7 @@ services:
- environment_druid

druid_broker:
image: apache/druid:28.0.0
image: apache/druid:29.0.0
container_name: druid_broker
volumes:
- broker_var:/opt/druid/var
Expand All @@ -136,7 +136,7 @@ services:
- environment_druid

druid_historical:
image: apache/druid:28.0.0
image: apache/druid:29.0.0
container_name: druid_historical
volumes:
- druid_shared:/opt/shared
Expand All @@ -153,7 +153,7 @@ services:
- environment_druid

druid_middlemanager:
image: apache/druid:28.0.0
image: apache/druid:29.0.0
container_name: druid_middlemanager
volumes:
- druid_shared:/opt/shared
Expand All @@ -171,7 +171,7 @@ services:
- environment_druid

druid_router:
image: apache/druid:28.0.0
image: apache/druid:29.0.0
container_name: druid_router
volumes:
- router_var:/opt/druid/var
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Await;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

final class DruidTestHelper {
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();

private DruidTestHelper() {
}

static void initServer() throws IOException, InterruptedException {
static void initServer() throws IOException, InterruptedException, TimeoutException {
String payload = """
{
"context": {"waitUntilSegmentsLoad": true, "finalizeAggregations": false, "groupByEnableMultiValueUnnesting": false, "executionMode":"async", "maxNumTasks": 2},
"header": true,
"query": "REPLACE INTO \\"products\\" OVERWRITE ALL WITH \\"ext\\" AS ( SELECT * FROM TABLE(EXTERN('{\\"type\\":\\"http\\",\\"uris\\":[\\"https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/products/products-1000.csv\\"]}',\\n '{\\"type\\":\\"csv\\",\\"findColumnsFromHeader\\":true}'\\n )\\n ) EXTEND (\\"index\\" BIGINT, \\"name\\" VARCHAR, \\"ean\\" BIGINT)) SELECT TIMESTAMP '2000-01-01 00:00:00' AS \\"__time\\", \\"index\\", \\"name\\", \\"ean\\"FROM \\"ext\\" PARTITIONED BY ALL",
"query": "REPLACE INTO \\"products\\" OVERWRITE ALL WITH \\"ext\\" AS ( SELECT * FROM TABLE(EXTERN('{\\"type\\":\\"http\\",\\"uris\\":[\\"https://drive.google.com/uc?id=1OT84-j5J5z2tHoUvikJtoJFInWmlyYzY&export=download\\"]}',\\n '{\\"type\\":\\"csv\\",\\"findColumnsFromHeader\\":true}'\\n )\\n ) EXTEND (\\"index\\" BIGINT, \\"name\\" VARCHAR, \\"ean\\" BIGINT)) SELECT TIMESTAMP '2000-01-01 00:00:00' AS \\"__time\\", \\"index\\", \\"name\\", \\"ean\\"FROM \\"ext\\" PARTITIONED BY ALL",
"resultFormat": "array",
"sqlTypesHeader": true,
"typesHeader": true
Expand All @@ -37,17 +39,20 @@ static void initServer() throws IOException, InterruptedException {
String queryId = json.get("queryId").asText();

// we need to wait until Druid has processed the request
String state = null;
while (!"SUCCESS".equals(state)) {
TimeUnit.SECONDS.sleep(5);
URI queryUri = URI.create("http://localhost:8888/druid/v2/sql/statements/"+ queryId);
var queryHttpRequest = HttpRequest.newBuilder(queryUri)
Await.until(() -> {
try {
URI queryUri = URI.create("http://localhost:8888/druid/v2/sql/statements/"+ queryId);
var queryHttpRequest = HttpRequest.newBuilder(queryUri)
.header("Accept", "application/json")
.GET()
.build();
var queryHttpResponse = httpClient.send(queryHttpRequest, HttpResponse.BodyHandlers.ofString());
var queryJson = OBJECT_MAPPER.readTree(queryHttpResponse.body());
state = queryJson.get("state").asText();
}
var queryHttpResponse = httpClient.send(queryHttpRequest, HttpResponse.BodyHandlers.ofString());
var queryJson = OBJECT_MAPPER.readTree(queryHttpResponse.body());
String state = queryJson.get("state").asText();
return "SUCCESS".equals(state);
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}, Duration.ofSeconds(1), Duration.ofMinutes(1));
}
}

0 comments on commit e046785

Please sign in to comment.