Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RowStats for multiphase parallel indexing task #12280

Merged
merged 14 commits into from
Mar 2, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;

import java.util.List;
import java.util.Map;

/**
* Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is
Expand All @@ -35,9 +37,10 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport
@JsonCreator
GeneratedPartitionsMetadataReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("partitionStats") List<PartitionStat> partitionStats
@JsonProperty("partitionStats") List<PartitionStat> partitionStats,
@JsonProperty("taskReport") Map<String, TaskReport> taskReport
)
{
super(taskId, partitionStats);
super(taskId, partitionStats, taskReport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -33,11 +35,13 @@ public class GeneratedPartitionsReport implements SubTaskReport
{
private final String taskId;
private final List<PartitionStat> partitionStats;
private final Map<String, TaskReport> taskReport;

GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats)
GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, Map<String, TaskReport> taskReport)
{
this.taskId = taskId;
this.partitionStats = partitionStats;
this.taskReport = taskReport;
}

@Override
Expand All @@ -47,6 +51,12 @@ public String getTaskId()
return taskId;
}

@JsonProperty
public Map<String, TaskReport> getTaskReport()
{
return taskReport;
}

@JsonProperty
public List<PartitionStat> getPartitionStats()
{
Expand All @@ -64,13 +74,14 @@ public boolean equals(Object o)
}
GeneratedPartitionsReport that = (GeneratedPartitionsReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(partitionStats, that.partitionStats);
Objects.equals(partitionStats, that.partitionStats) &&
Objects.equals(taskReport, that.taskReport);
}

@Override
public int hashCode()
{
return Objects.hash(taskId, partitionStats);
return Objects.hash(taskId, partitionStats, taskReport);
}

@Override
Expand All @@ -79,6 +90,7 @@ public String toString()
return "GeneratedPartitionsReport{" +
"taskId='" + taskId + '\'' +
", partitionStats=" + partitionStats +
", taskReport=" + taskReport +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.incremental.MutableRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
Expand Down Expand Up @@ -176,6 +177,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@MonotonicNonNull
private volatile TaskToolbox toolbox;

@MonotonicNonNull
private Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;

private IngestionState ingestionState;

@JsonCreator
Expand Down Expand Up @@ -726,6 +730,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
);
return TaskStatus.failure(getId(), errMsg);
}
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);

// 2. Partial segment merge phase
// partition (interval, partitionId) -> partition locations
Expand Down Expand Up @@ -814,6 +819,8 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
return TaskStatus.failure(getId(), errMsg);
}

indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);

// partition (interval, partitionId) -> partition locations
Map<Partition, List<PartitionLocation>> partitionToLocations =
getPartitionToLocations(indexingRunner.getReports());
Expand Down Expand Up @@ -1477,10 +1484,7 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
boolean includeUnparseable
)
{
long processed = 0L;
long processedWithError = 0L;
long thrownAway = 0L;
long unparseable = 0L;
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();

List<ParseExceptionReport> unparseableEvents = new ArrayList<>();

Expand All @@ -1492,35 +1496,75 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
continue;
}
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
RowIngestionMetersTotals rowIngestionMetersTotals =
getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}

RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
parallelSinglePhaseRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}

if (includeUnparseable) {
List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
ParallelIndexTaskRunner<?, ?> currentRunner,
boolean includeUnparseable
)
{
if (indexGenerateRowStats != null) {
return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
} else if (!currentRunner.getName().equals("partial segment generation")) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
} else {
Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();

final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowStatsForCompletedTask =
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);

buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
}

processed += totals.getProcessed();
processedWithError += totals.getProcessedWithError();
thrownAway += totals.getThrownAway();
unparseable += totals.getUnparseable();
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
currentRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
}

// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
Set<String> runningTaskIds,
List<ParseExceptionReport> unparseableEvents,
boolean includeUnparseable
)
{
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
for (String runningTaskId : runningTaskIds) {
try {
Map<String, Object> report = toolbox.getIndexingServiceClient().getTaskReport(runningTaskId);
if (report == null || report.isEmpty()) {
// task does not have a running report yet
continue;
}

Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
Expand All @@ -1529,33 +1573,53 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

if (includeUnparseable) {
Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>) taskUnparseableEvents.get(
RowIngestionMeters.BUILD_SEGMENTS
);
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
}

processed += ((Number) buildSegments.get("processed")).longValue();
processedWithError += ((Number) buildSegments.get("processedWithError")).longValue();
thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
unparseable += ((Number) buildSegments.get("unparseable")).longValue();
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
}
catch (Exception e) {
LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
}
}
return buildSegmentsRowStats.getTotals();
}

private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
RowIngestionMetersTotals rowStats,
List<ParseExceptionReport> unparseableEvents
)
{
Map<String, Object> rowStatsMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable)
);
totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
rowStatsMap.put("totals", totalsMap);

return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
}

private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
Map<String, TaskReport> taskReport,
boolean includeUnparseable,
List<ParseExceptionReport> unparseableEvents)
{
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
if (includeUnparseable) {
List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
}
return totals;
}

private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
{
if (currentSubTaskHolder == null) {
Expand All @@ -1569,8 +1633,10 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab

if (isParallelMode()) {
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
// multiphase is not supported yet
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
return doGetRowStatsAndUnparseableEventsParallelMultiPhase(
(ParallelIndexTaskRunner<?, ?>) currentRunner,
includeUnparseable
);
} else {
return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
(SinglePhaseParallelIndexTaskRunner) currentRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -161,12 +162,12 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd
}

@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -175,11 +176,11 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd
}

@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
}
}
Loading