Skip to content

Commit

Permalink
Deal with potential cardinality estimate being negative and add loggi…
Browse files Browse the repository at this point in the history
…ng to hash determine partitions phase (#12443)

* Deal with potential cardinality estimate being negative and add logging

* Fix typo in name

* Refine and minimize logging

* Make it info based on code review

* Create a named constant for the magic number
  • Loading branch information
Agustin Gonzalez authored May 20, 2022
1 parent f9bdb3b commit c236227
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -129,6 +130,16 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen

private static final String TASK_PHASE_FAILURE_MSG = "Failed in phase[%s]. See task logs for details.";

// Sometimes Druid estimates one shard for hash partitioning despite conditions
// indicating that there ought to be more than one. We have not been able to
// reproduce but looking at the code around where the following constant is used one
// possibility is that the sketch's estimate is negative. If that case happens
// code has been added to log it and to set the estimate to the value of the
// following constant. It is not necessary to parametize this value since if this
// happens it is a bug and the new logging may now provide some evidence to reproduce
// and fix
private static final long DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE = 7L;

private final ParallelIndexIngestionSpec ingestionSchema;
/**
* Base name for the {@link SubTaskSpec} ID.
Expand Down Expand Up @@ -703,6 +714,10 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);

// This is for potential debugging in case we suspect bad estimation of cardinalities etc,
LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString());

} else {
intervalToNumShards = CollectionUtils.mapValues(
mergeCardinalityReports(cardinalityRunner.getReports().values()),
Expand Down Expand Up @@ -901,13 +916,40 @@ public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
return computeIntervalToNumShards(maxRowsPerSegment, finalCollectors);
}

@Nonnull
@VisibleForTesting
static Map<Interval, Integer> computeIntervalToNumShards(
int maxRowsPerSegment,
Map<Interval, Union> finalCollectors
)
{
return CollectionUtils.mapValues(
finalCollectors,
union -> {
final double estimatedCardinality = union.getEstimate();
// determine numShards based on maxRowsPerSegment and the cardinality
final long estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment);
final long estimatedNumShards;
if (estimatedCardinality <= 0) {
estimatedNumShards = DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE;
LOG.warn("Estimated cardinality for union of estimates is zero or less: %.2f, setting num shards to %d",
estimatedCardinality, estimatedNumShards
);
} else {
// determine numShards based on maxRowsPerSegment and the cardinality
estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment);
}
LOG.info("estimatedNumShards %d given estimated cardinality %.2f and maxRowsPerSegment %d",
estimatedNumShards, estimatedCardinality, maxRowsPerSegment
);
// We have seen this before in the wild in situations where more shards should have been created,
// log it if it happens with some information & context
if (estimatedNumShards == 1) {
LOG.info("estimatedNumShards is ONE (%d) given estimated cardinality %.2f and maxRowsPerSegment %d",
estimatedNumShards, estimatedCardinality, maxRowsPerSegment
);
}
try {
return Math.max(Math.toIntExact(estimatedNumShards), 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,38 @@
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.junit.LoggerCaptureRule;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.mockito.Mockito.mock;

public class DimensionCardinalityReportTest
{
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();

private DimensionCardinalityReport target;

@Rule
public final LoggerCaptureRule logger = new LoggerCaptureRule(ParallelIndexSupervisorTask.class);


@Before
public void setup()
{
Expand Down Expand Up @@ -293,4 +305,72 @@ public void testLargeSupervisorDetermineNumShardsFromCardinalityReport()
intervalToNumShards
);
}


@Test
public void testSupervisorDetermineNegativeNumShardsFromCardinalityReport()
{
logger.clearLogEvents();
Union negativeUnion = mock(Union.class);
Mockito.when(negativeUnion.getEstimate()).thenReturn(-1.0);
Interval interval = Intervals.of("2001-01-01/P1D");
Map<Interval, Union> intervalToUnion = ImmutableMap.of(interval, negativeUnion);
Map<Interval, Integer> intervalToNumShards =
ParallelIndexSupervisorTask.computeIntervalToNumShards(10, intervalToUnion);
Assert.assertEquals(new Integer(7), intervalToNumShards.get(interval));

List<LogEvent> loggingEvents = logger.getLogEvents();
String expectedLogMessage =
"Estimated cardinality for union of estimates is zero or less: -1.00, setting num shards to 7";
Assert.assertTrue(
"Logging events: " + loggingEvents,
loggingEvents.stream()
.anyMatch(l ->
l.getLevel().equals(Level.WARN)
&& l.getMessage()
.getFormattedMessage()
.equals(expectedLogMessage)
)
);
}

@Test
public void testSupervisorDeterminePositiveNumShardsFromCardinalityReport()
{
Union union = mock(Union.class);
Mockito.when(union.getEstimate()).thenReturn(24.0);
Interval interval = Intervals.of("2001-01-01/P1D");
Map<Interval, Union> intervalToUnion = ImmutableMap.of(interval, union);
Map<Interval, Integer> intervalToNumShards =
ParallelIndexSupervisorTask.computeIntervalToNumShards(6, intervalToUnion);
Assert.assertEquals(new Integer(4), intervalToNumShards.get(interval));
}

@Test
public void testSupervisorDeterminePositiveOneShardFromCardinalityReport()
{
logger.clearLogEvents();
Union union = mock(Union.class);
Mockito.when(union.getEstimate()).thenReturn(24.0);
Interval interval = Intervals.of("2001-01-01/P1D");
Map<Interval, Union> intervalToUnion = ImmutableMap.of(interval, union);
Map<Interval, Integer> intervalToNumShards =
ParallelIndexSupervisorTask.computeIntervalToNumShards(24, intervalToUnion);
Assert.assertEquals(new Integer(1), intervalToNumShards.get(interval));

List<LogEvent> loggingEvents = logger.getLogEvents();
String expectedLogMessage =
"estimatedNumShards is ONE (1) given estimated cardinality 24.00 and maxRowsPerSegment 24";
Assert.assertTrue(
"Logging events: " + loggingEvents,
loggingEvents.stream()
.anyMatch(l ->
l.getLevel().equals(Level.INFO)
&& l.getMessage()
.getFormattedMessage()
.equals(expectedLogMessage)
)
);
}

}

0 comments on commit c236227

Please sign in to comment.