Skip to content

Commit

Permalink
Update druid version to 29.0.0
Browse files Browse the repository at this point in the history
Co-authored-by: fabricebaranski <[email protected]>
  • Loading branch information
juhoautio-rovio and fabricebaranski committed Aug 16, 2024
1 parent 48dc6e4 commit a27fd18
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 14 deletions.
79 changes: 78 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
</developers>

<properties>
<druid.version>25.0.0</druid.version>
<druid.version>29.0.0</druid.version>
<mysql.connector.version>8.0.28</mysql.connector.version>
<postgresql.version>42.2.23</postgresql.version>
<aws.sdk.version>1.12.129</aws.sdk.version>
<java.version>1.8</java.version>
<antlr4.version>4.9.3</antlr4.version>
<jackson.version>2.14.3</jackson.version>
<slf4j.version>1.7.32</slf4j.version>
<junit-jupiter.version>5.7.2</junit-jupiter.version>
<junit.version>4.13.2</junit.version>
Expand Down Expand Up @@ -105,6 +107,56 @@
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-smile-provider</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a older version than what spark 3.0.1 brings -->
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -267,6 +319,31 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.version}</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${antlr4.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/com/rovio/ingest/TaskDataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
Expand Down Expand Up @@ -74,7 +75,7 @@
import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER;

class TaskDataWriter implements DataWriter<InternalRow> {
private static final IndexIO INDEX_IO = new IndexIO(MAPPER, () -> 0);
private static final IndexIO INDEX_IO = new IndexIO(MAPPER, ColumnConfig.DEFAULT);
private static final IndexMerger INDEX_MERGER_V_9 = new IndexMergerV9(MAPPER, INDEX_IO, TmpFileSegmentWriteOutMediumFactory.instance());
private static final Logger LOG = LoggerFactory.getLogger(TaskDataWriter.class);

Expand Down Expand Up @@ -112,7 +113,7 @@ class TaskDataWriter implements DataWriter<InternalRow> {
this.appenderator.startJob();

try {
ReflectionUtils.setStaticFieldValue(NullHandling.class, "INSTANCE", new NullValueHandlingConfig(context.isUseDefaultValueForNull(), null));
ReflectionUtils.setStaticFieldValue(NullHandling.class, "INSTANCE", new NullValueHandlingConfig(context.isUseDefaultValueForNull(), context.isUseThreeValueLogicForNativeFilters(), null));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalStateException("Unable to set null handling!!", e);
}
Expand Down Expand Up @@ -270,7 +271,7 @@ private RealtimeTuningConfig getTuningConfig(WriterContext context) {
null,
null,
null,
new IndexSpec(getBitmapSerdeFactory(context), null, null, null),
new IndexSpec(getBitmapSerdeFactory(context), null, null, null, null, null, null),
null,
0,
0,
Expand All @@ -287,7 +288,7 @@ private BitmapSerdeFactory getBitmapSerdeFactory(WriterContext context) {
case "concise":
return new ConciseBitmapSerdeFactory();
case "roaring":
return new RoaringBitmapSerdeFactory(true);
return RoaringBitmapSerdeFactory.getInstance();
}
throw new IllegalArgumentException(
"Unknown bitmap factory: '" + context.getBitmapFactory() + "'");
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/rovio/ingest/WriterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class WriterContext implements Serializable {
private final String version;
private final boolean rollup;
private final boolean useDefaultValueForNull;
private final boolean useThreeValueLogicForNativeFilters;
private final String dimensionsSpec;
private final String metricsSpec;
private final String transformSpec;
Expand Down Expand Up @@ -105,6 +106,7 @@ private WriterContext(CaseInsensitiveStringMap options, String version) {
this.initDataSource = options.getBoolean(ConfKeys.DATASOURCE_INIT, false);
this.rollup = options.getBoolean(ConfKeys.SEGMENT_ROLLUP, true);
this.useDefaultValueForNull = options.getBoolean(ConfKeys.USE_DEFAULT_VALUES_FOR_NULL, true);
this.useThreeValueLogicForNativeFilters = options.getBoolean(ConfKeys.USE_THREE_VALUE_LOGIC_FOR_NATIVE_FILTERS, true);
this.dimensionsSpec = options.getOrDefault(ConfKeys.DIMENSIONS_SPEC, null);
this.metricsSpec = options.getOrDefault(ConfKeys.METRICS_SPEC, null);
this.transformSpec = options.getOrDefault(ConfKeys.TRANSFORM_SPEC, null);
Expand Down Expand Up @@ -220,6 +222,10 @@ public boolean isUseDefaultValueForNull() {
return useDefaultValueForNull;
}

public boolean isUseThreeValueLogicForNativeFilters() {
return useThreeValueLogicForNativeFilters;
}

public String getDimensionsSpec() {
return dimensionsSpec;
}
Expand Down Expand Up @@ -252,6 +258,7 @@ public static class ConfKeys {
public static final String MAX_ROWS_IN_MEMORY = "druid.memory.max_rows";
public static final String SEGMENT_ROLLUP = "druid.segment.rollup";
public static final String USE_DEFAULT_VALUES_FOR_NULL = "druid.use_default_values_for_null";
public static final String USE_THREE_VALUE_LOGIC_FOR_NATIVE_FILTERS = "druid.use_three_value_logic_for_native_filters";
// Metadata config
public static final String METADATA_DB_TYPE = "druid.metastore.db.type";
public static final String METADATA_DB_URI = "druid.metastore.db.uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Table;
import com.rovio.ingest.extensions.java.DruidDatasetExtensions;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchHolder;
import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -402,7 +402,7 @@ public void shouldWriteDataSegmentsWithSketchBuild() throws IOException {
.build();
Map<String, Object> data = parsed.get(0, dimensions);
assertNotNull(data);
assertEquals(1.0, ((HllSketch) data.get("string_column_hll")).getEstimate());
assertEquals(1.0, ((HllSketchHolder) data.get("string_column_hll")).getEstimate());
assertEquals(1.0, ((SketchHolder) data.get("string_column_theta")).getEstimate());

// String column is automatically excluded from dimensions as it is used for sketch aggregation.
Expand All @@ -415,7 +415,7 @@ public void shouldWriteDataSegmentsWithSketchBuild() throws IOException {
.build();
data = parsed.get(0, dimensions);
assertNotNull(data);
assertEquals(1.0, ((HllSketch) data.get("string_column_hll")).getEstimate());
assertEquals(1.0, ((HllSketchHolder) data.get("string_column_hll")).getEstimate());
assertEquals(1.0, ((SketchHolder) data.get("string_column_theta")).getEstimate());
}

Expand Down
25 changes: 19 additions & 6 deletions src/test/java/com/rovio/ingest/DruidSourceBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.segment.transform.TransformSpec;
Expand All @@ -51,6 +52,7 @@
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
import java.io.IOException;
Expand All @@ -63,6 +65,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
Expand Down Expand Up @@ -93,10 +96,18 @@ public class DruidSourceBaseTest extends SharedJavaSparkContext {
public static MySQLContainer<?> getMySQLContainer() {
// MySQL 8 requires mysql-connector-java 8.x so we test against that.
// The mysql-connector-java 8.x also works with MySQL 5
return new MySQLContainer<>("mysql:8.0.28")
.withUsername(dbUser)
.withPassword(dbPass)
.withDatabaseName(DB_NAME);
if (Objects.equals(System.getProperty("os.arch"), "aarch64")) {
DockerImageName myImage = DockerImageName.parse("arm64v8/mysql:8.0.36").asCompatibleSubstituteFor("mysql");
return new MySQLContainer<>(myImage)
.withUsername(dbUser)
.withPassword(dbPass)
.withDatabaseName(DB_NAME);
} else {
return new MySQLContainer<>("mysql:8.0.36")
.withUsername(dbUser)
.withPassword(dbPass)
.withDatabaseName(DB_NAME);
}
}

public static PostgreSQLContainer<?> getPostgreSQLContainer() {
Expand Down Expand Up @@ -128,6 +139,7 @@ public static void prepareDatabase(JdbcDatabaseContainer<?> jdbc) throws SQLExce
+ " version VARCHAR(255) NOT NULL,\n"
+ " used BOOLEAN NOT NULL,\n"
+ " payload BLOB NOT NULL,\n"
+ " used_status_last_updated VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
segmentsTable));
Expand All @@ -143,6 +155,7 @@ public static void prepareDatabase(JdbcDatabaseContainer<?> jdbc) throws SQLExce
+ " version VARCHAR(255) NOT NULL,\n"
+ " used BOOLEAN NOT NULL,\n"
+ " payload BYTEA NOT NULL,\n"
+ " used_status_last_updated VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
segmentsTable));
Expand Down Expand Up @@ -242,7 +255,7 @@ static void verifySegmentTable(Interval interval, String version, boolean used,
String sql = "SELECT count(id) as c from %1s" +
" where dataSource = :dataSource" +
" and start < :end" +
" and end >= :start" +
" and \"end\" >= :start" +
" and version = :version" +
" and used = :used";

Expand Down Expand Up @@ -282,7 +295,7 @@ protected Table<Integer, ImmutableMap<String, Object>, ImmutableMap<String, Obje
private ImmutableMap<ImmutableMap<String, Object>, ImmutableMap<String, Object>> readSegmentDir(Interval interval, Path segmentDir) throws IOException {
ImmutableMap.Builder<ImmutableMap<String, Object>, ImmutableMap<String, Object>> values = ImmutableMap.builder();

IndexIO indexIO = new IndexIO(MAPPER, () -> 0);
IndexIO indexIO = new IndexIO(MAPPER, ColumnConfig.DEFAULT);
QueryableIndexStorageAdapter queryableIndexStorageAdapter = new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentDir.toFile()));
WindowedStorageAdapter windowedStorageAdapter = new WindowedStorageAdapter(queryableIndexStorageAdapter, interval);
try (IngestSegmentFirehose firehose = new IngestSegmentFirehose(
Expand Down

0 comments on commit a27fd18

Please sign in to comment.