Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multi-stage] Reduce the stats transferred #12517

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
Expand Down Expand Up @@ -179,14 +181,20 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));

ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap;
if (!traceEnabled) {
stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false));
} else {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages);
for (int stageId = 0; stageId < numStages; stageId++) {
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true));
}
}

long executionStartTimeNs = System.nanoTime();
ResultTable queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,10 @@ public static MetadataBlock getErrorDataBlock(Map<Integer, String> exceptions) {
}

public static MetadataBlock getEndOfStreamDataBlock() {
// TODO: add query statistics metadata for the block.
return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
}

public static MetadataBlock getEndOfStreamDataBlock(Map<String, String> stats) {
// TODO: add query statistics metadata for the block.
return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@

public final class TransferableBlockUtils {
private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
private static final TransferableBlock EMPTY_EOS = new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());

private TransferableBlockUtils() {
// do not instantiate.
}

public static TransferableBlock getEndOfStreamTransferableBlock() {
return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
return EMPTY_EOS;
}

public static TransferableBlock getEndOfStreamTransferableBlock(Map<String, String> statsMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;


/**
Expand Down Expand Up @@ -75,24 +77,39 @@ protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter spl
*/
public boolean send(TransferableBlock block)
throws Exception {
if (block.isEndOfStreamBlock()) {
if (block.isErrorBlock()) {
// Send error block to all mailboxes to propagate the error
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
sendBlock(sendingMailbox, block);
}
return false;
} else {
boolean isEarlyTerminated = true;
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
if (!sendingMailbox.isEarlyTerminated()) {
isEarlyTerminated = false;
break;
}
}

if (block.isSuccessfulEndOfStreamBlock()) {
// Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes
int numMailboxes = _sendingMailboxes.size();
int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes);
for (int i = 0; i < numMailboxes; i++) {
SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
TransferableBlock blockToSend =
i == mailboxIdToSendMetadata ? block : TransferableBlockUtils.getEndOfStreamTransferableBlock();
sendBlock(sendingMailbox, blockToSend);
}
if (!isEarlyTerminated) {
route(_sendingMailboxes, block);
return false;
}

assert block.isDataBlock();
boolean isEarlyTerminated = true;
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
if (!sendingMailbox.isEarlyTerminated()) {
isEarlyTerminated = false;
break;
}
return isEarlyTerminated;
}
if (!isEarlyTerminated) {
route(_sendingMailboxes, block);
}
return isEarlyTerminated;
}

protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.util.Pair;
import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
Expand Down Expand Up @@ -89,7 +90,7 @@ public QueryDispatcher(MailboxService mailboxService) {
}

public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
throws Exception {
long requestId = context.getRequestId();
try {
Expand Down Expand Up @@ -278,20 +279,16 @@ public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatc
}

private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats,
@Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
if (executionStatsAggregatorMap != null) {
LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime());
for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) {
OperatorStats operatorStats = entry.getValue();
ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0);
ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId());
rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
@Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
if (MapUtils.isNotEmpty(statsAggregatorMap)) {
for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) {
ExecutionStatsAggregator rootStatsAggregator = statsAggregatorMap.get(0);
rootStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
ExecutionStatsAggregator stageStatsAggregator = statsAggregatorMap.get(operatorStats.getStageId());
if (stageStatsAggregator != null) {
if (dispatchableSubPlan != null) {
OperatorUtils.recordTableName(operatorStats,
dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
}
stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
OperatorUtils.recordTableName(operatorStats,
dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
stageStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
}
}
}
Expand Down
Loading