Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-Compaction using Multi-Stage Query Engine #16291

Merged
merged 64 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
c1dc09d
Checkpointing poc work
gargvishesh Jan 8, 2024
d11dcf4
Add CompactionClient
gargvishesh Jan 8, 2024
1d4a230
Merge branch 'master' into msq-auto-compaction
gargvishesh Jan 16, 2024
a861c57
Compaction POC working version
gargvishesh Jan 24, 2024
959a9b3
Cleanup
gargvishesh Jan 29, 2024
5290bdc
Cleanup 2
gargvishesh Jan 29, 2024
c7b53e5
Merge branch 'master' into msq-auto-compaction
gargvishesh Feb 20, 2024
6cd0230
Merge branch 'master' into msq-auto-compaction
gargvishesh Apr 2, 2024
65a0343
Separate out native and msq-based compaction flows and refine query c…
gargvishesh Apr 5, 2024
9b9128f
Add engine field compaction config validation
gargvishesh Apr 8, 2024
dfaef5d
Compaction to scan query on msq
gargvishesh Apr 15, 2024
e142186
Merge branch 'master' into msq-auto-compaction
gargvishesh Apr 15, 2024
1deef3e
Code cleanup
gargvishesh Apr 16, 2024
177bea9
Precondition checks and updates to match lastCompactionState with com…
gargvishesh Apr 22, 2024
e6222e8
Simplify guice module and refactor CompactionToMSQTaskImpl
gargvishesh Apr 23, 2024
8923fbd
Fix checkstyle
gargvishesh Apr 23, 2024
279c619
Move CompactionToMSQTaskImpl to msq indexing.
gargvishesh Apr 24, 2024
affa12b
Merge branch 'master' into msq-auto-compaction
gargvishesh Apr 24, 2024
0a75321
Fix tests
gargvishesh Apr 24, 2024
6e2d3f4
Move to json deserialization of compaction strategy.
gargvishesh Apr 27, 2024
c600fa3
Refactorings
gargvishesh Apr 29, 2024
2f0203e
Revert compaction engine field addition in CompactionState for now
gargvishesh Apr 29, 2024
a33e55d
Update serde test.
gargvishesh Apr 29, 2024
880dbcf
Merge branch 'master' into msq-auto-compaction
gargvishesh Apr 29, 2024
f3a86c1
Remove serde step for MSQControllerTask, update tests, add context ch…
gargvishesh May 2, 2024
1a9af3c
Fields scope changes
gargvishesh May 7, 2024
b791310
Fix TYPE field and currentSubtaskHolder
gargvishesh May 9, 2024
bd7ffb7
Add segment load wait
gargvishesh May 17, 2024
299c25d
Extract dim and metrics from query for lastCompactionState
gargvishesh May 20, 2024
642952c
Address review comments
gargvishesh May 24, 2024
224ae60
Merge branch 'master' into msq-auto-compaction
gargvishesh May 25, 2024
02df1b5
Post merge fixes
gargvishesh May 25, 2024
fad84d1
Address review comments and fix compaction state comparison for MSQ.
gargvishesh May 26, 2024
9c7ee58
Fix checkstyle issues
gargvishesh May 27, 2024
4f5d356
Fix checkstyle issues and tests
gargvishesh May 27, 2024
7df2dc8
Fix more tests
gargvishesh May 28, 2024
6d893fc
Add segmentCacheManagerFactory in CompactionTask and update tests
gargvishesh May 28, 2024
30770ac
Minor fix
gargvishesh May 28, 2024
0ce405b
Fix style
gargvishesh May 29, 2024
b7261c2
Add tests for coverage and minor refactoring
gargvishesh May 29, 2024
987504b
Fix test failures
gargvishesh May 30, 2024
dfe40af
Address review comments
gargvishesh May 31, 2024
98aafb0
Address review comments
gargvishesh Jun 18, 2024
bcfab9b
Remove finalizeAggregations check
gargvishesh Jun 20, 2024
7a8c026
Merge branch 'refs/heads/master' into msq-auto-compaction
gargvishesh Jun 20, 2024
2043beb
Remove dimensionToAggregatorFactory map and change source of metricsS…
gargvishesh Jun 20, 2024
6eb5888
Change metrics comparison to use combining factory
gargvishesh Jun 24, 2024
950f05d
Address latest review comments - part 1
gargvishesh Jun 25, 2024
cc5b070
Address latest review comments - part 2
gargvishesh Jun 26, 2024
3f81cf1
Fix style, coverage and tests.
gargvishesh Jun 27, 2024
bdecad9
Merge branch 'refs/heads/master' into msq-auto-compaction
gargvishesh Jun 27, 2024
053f2e9
Fix build
gargvishesh Jun 27, 2024
74273ea
try again
gargvishesh Jun 27, 2024
af5a344
fix style
gargvishesh Jun 27, 2024
569ca49
Support query granularity for scan queries, support query granularity…
gargvishesh Jul 2, 2024
01cdabc
fix style
gargvishesh Jul 2, 2024
b122cdf
Fix "ALL" granularity deserialization and support dedup when rollup=t…
gargvishesh Jul 2, 2024
fa6353d
fix tests
gargvishesh Jul 2, 2024
832c08f
Address review comments and add ITs
gargvishesh Jul 9, 2024
fea7f94
Address review comments
gargvishesh Jul 11, 2024
c012659
Merge branch 'refs/heads/master' into msq-auto-compaction
gargvishesh Jul 11, 2024
eb816b3
Fix code coverage
gargvishesh Jul 11, 2024
607ae3e
Fix IT failure
gargvishesh Jul 11, 2024
9c82023
Fix IT failure
gargvishesh Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void setup()
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
Expand Down Expand Up @@ -1559,7 +1560,7 @@ private void handleQueryResults(
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
log.warn(
"storeCompactionState flag set for a non-REPLACE query [%s]. Ignoring the flag for now.",
"Ignoring storeCompactionState flag since it is set for a non-REPLACE query[%s].",
queryDef.getQueryId()
);
} else {
Expand Down Expand Up @@ -1659,9 +1660,11 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo

GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity,
dataSchema.getGranularitySpec().getQueryGranularity(),
QueryContext.of(querySpec.getQuery().getContext())
.getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper),
dataSchema.getGranularitySpec().isRollup(),
dataSchema.getGranularitySpec().inputIntervals()
// Not using dataSchema.getGranularitySpec().inputIntervals() as that always has ETERNITY
((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks()
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
);

DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
Expand All @@ -1673,9 +1676,9 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(), new TypeReference<List<Object>>()
{
});
dataSchema.getAggregators(),
new TypeReference<List<Object>>() {}
);


IndexSpec indexSpec = tuningConfig.getIndexSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.msq.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
Expand All @@ -29,6 +30,7 @@
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.MSQCompactionRunner;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.error.BroadcastTablesTooLargeFault;
Expand Down Expand Up @@ -192,6 +194,8 @@ public List<? extends Module> getJacksonModules()
NilInputSource.class
);

module.registerSubtypes(new NamedType(MSQCompactionRunner.class, MSQCompactionRunner.TYPE));

FAULT_CLASSES.forEach(module::registerSubtypes);
module.addSerializer(new CounterSnapshotsSerializer());
return Collections.singletonList(module);
Expand Down
Loading
Loading