Skip to content

Commit

Permalink
Handle errors gracefully during multi-stage stats collection in the b…
Browse files Browse the repository at this point in the history
…roker (apache#13496)
  • Loading branch information
yashmayya authored and suyashpatel98 committed Jul 6, 2024
1 parent c33e57e commit d7f1778
Showing 1 changed file with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,6 +80,7 @@

public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);

private static final int NUM_UNAVAILABLE_SEGMENTS_TO_LOG = 10;

private final WorkerManager _workerManager;
Expand Down Expand Up @@ -270,17 +272,25 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse,
List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
List<PlanNode> planNodes = new ArrayList<>(stagePlans.size());
for (DispatchablePlanFragment stagePlan : stagePlans) {
planNodes.add(stagePlan.getPlanFragment().getFragmentRoot());
}
MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(planNodes, queryStats);
brokerResponse.setStageStats(treeBuilder.jsonStatsByStage(0));
for (MultiStageQueryStats.StageStats.Closed stageStats : queryStats) {
if (stageStats != null) { // for example pipeline breaker may not have stats
stageStats.forEach((type, stats) -> type.mergeInto(brokerResponse, stats));
try {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
List<PlanNode> planNodes = new ArrayList<>(stagePlans.size());
for (DispatchablePlanFragment stagePlan : stagePlans) {
planNodes.add(stagePlan.getPlanFragment().getFragmentRoot());
}
MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(planNodes, queryStats);
brokerResponse.setStageStats(treeBuilder.jsonStatsByStage(0));
for (MultiStageQueryStats.StageStats.Closed stageStats : queryStats) {
if (stageStats != null) { // for example pipeline breaker may not have stats
stageStats.forEach((type, stats) -> type.mergeInto(brokerResponse, stats));
}
}
} catch (Exception e) {
LOGGER.warn("Error encountered while collecting multi-stage stats", e);
brokerResponse.setStageStats(JsonNodeFactory.instance.objectNode().put(
"error",
"Error encountered while collecting multi-stage stats - " + e)
);
}
}

Expand Down

0 comments on commit d7f1778

Please sign in to comment.