Skip to content

Commit

Permalink
Auto-compaction with segment granularity should skip segments that al…
Browse files Browse the repository at this point in the history
…ready have the configured segmentGranularity (#11009)

* Auto-compaction with segment granularity should skip segments that already have the configured segmentGranularity

* Auto-compaction with segment granularity should skip segments that already have the configured segmentGranularity

* Auto-compaction with segment granularity should skip segments that already have the configured segmentGranularity

* address comments

* address comments

* address comments

* address comments

* address comments
  • Loading branch information
maytasm authored Mar 20, 2021
1 parent 5fae7df commit 51d2c61
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
Expand Down Expand Up @@ -372,6 +373,71 @@ public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws
}
}

@Test
public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);

// Compacted without SegmentGranularity in auto compaction config
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);

List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);

// Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.DAY;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
// should be no new compaction task as segmentGranularity is already DAY
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}

@Test
public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);

// Compacted without SegmentGranularity in auto compaction config
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);

List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);

// Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);

// There should be new compaction tasks since SegmentGranularity changed from DAY to YEAR
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertTrue(compactTasksAfter.size() > compactTasksBefore.size());
}
}

private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,20 +397,29 @@ private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCom
needsCompaction = true;
}

// Only checks for segmentGranularity as auto compaction currently only supports segmentGranularity
final Granularity segmentGranularity = lastCompactionState.getGranularitySpec() != null ?
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), GranularitySpec.class).getSegmentGranularity() :
null;

if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null &&
!config.getGranularitySpec().getSegmentGranularity().equals(segmentGranularity)) {
log.info(
"Configured granularitySpec[%s] is different from the one[%s] of segments. Needs compaction",
config.getGranularitySpec(),
segmentGranularity
);
needsCompaction = true;
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
// Only checks for segmentGranularity as auto compaction currently only supports segmentGranularity
final Granularity existingSegmentGranularity = lastCompactionState.getGranularitySpec() != null ?
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), GranularitySpec.class).getSegmentGranularity() :
null;
if (existingSegmentGranularity == null) {
// Candidate segments were all compacted without segment granularity set.
// We need to check if all segments have the same segment granularity as the configured segment granularity.
needsCompaction = candidates.segments.stream()
.anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
log.info(
"Segments were previously compacted but without segmentGranularity in auto compaction."
+ " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction",
config.getGranularitySpec().getSegmentGranularity()
);
} else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) {
log.info(
"Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction",
config.getGranularitySpec().getSegmentGranularity(),
existingSegmentGranularity
);
needsCompaction = true;
}
}

return needsCompaction;
Expand Down
Loading

0 comments on commit 51d2c61

Please sign in to comment.