From 4cfaf1a9dba1bdc848ba3b82ee7f9ec87889629f Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Wed, 28 Feb 2024 21:57:56 -0800 Subject: [PATCH] [Multi-stage] Reduce the stats transfered --- .../MultiStageBrokerRequestHandler.java | 20 +++++++--- .../common/datablock/DataBlockUtils.java | 2 - .../blocks/TransferableBlockUtils.java | 3 +- .../operator/exchange/BlockExchange.java | 39 +++++++++++++------ .../service/dispatch/QueryDispatcher.java | 25 ++++++------ 5 files changed, 55 insertions(+), 34 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 05fb1ddd522c..35aff7efd245 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -19,8 +19,9 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Maps; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -61,6 +62,7 @@ import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.catalog.PinotCatalog; import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.routing.WorkerManager; import org.apache.pinot.query.service.dispatch.QueryDispatcher; @@ -179,14 +181,20 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S Map queryOptions = sqlNodeAndOptions.getOptions(); boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); - - ResultTable queryResults; - Map stageIdStatsMap = new HashMap<>(); - for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) { - stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled)); + Map stageIdStatsMap; + if (!traceEnabled) { + stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false)); + } else { + List stagePlans = dispatchableSubPlan.getQueryStageList(); + int numStages = stagePlans.size(); + stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages); + for (int stageId = 0; stageId < numStages; stageId++) { + stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true)); + } } long executionStartTimeNs = System.nanoTime(); + ResultTable queryResults; try { queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions, stageIdStatsMap); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java index 5d381687120a..27f114032849 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java @@ -59,12 +59,10 @@ public static MetadataBlock getErrorDataBlock(Map exceptions) { } public static MetadataBlock getEndOfStreamDataBlock() { - // TODO: add query statistics metadata for the block. return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS); } public static MetadataBlock getEndOfStreamDataBlock(Map stats) { - // TODO: add query statistics metadata for the block. return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java index 01c5fd7ddd43..355b6fe294da 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java @@ -30,13 +30,14 @@ public final class TransferableBlockUtils { private static final int MEDIAN_COLUMN_SIZE_BYTES = 8; + private static final TransferableBlock EMPTY_EOS = new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); private TransferableBlockUtils() { // do not instantiate. } public static TransferableBlock getEndOfStreamTransferableBlock() { - return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); + return EMPTY_EOS; } public static TransferableBlock getEndOfStreamTransferableBlock(Map statsMap) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index 453288ecb31f..f8d49b632848 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.DataBlock; @@ -28,6 +29,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; /** @@ -75,24 +77,39 @@ protected BlockExchange(List sendingMailboxes, BlockSplitter spl */ public boolean send(TransferableBlock block) throws Exception { - if (block.isEndOfStreamBlock()) { + if (block.isErrorBlock()) { + // Send error block to all mailboxes to propagate the error for (SendingMailbox sendingMailbox : _sendingMailboxes) { sendBlock(sendingMailbox, block); } return false; - } else { - boolean isEarlyTerminated = true; - for (SendingMailbox sendingMailbox : _sendingMailboxes) { - if (!sendingMailbox.isEarlyTerminated()) { - isEarlyTerminated = false; - break; - } + } + + if (block.isSuccessfulEndOfStreamBlock()) { + // Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes + int numMailboxes = _sendingMailboxes.size(); + int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes); + for (int i = 0; i < numMailboxes; i++) { + SendingMailbox sendingMailbox = _sendingMailboxes.get(i); + TransferableBlock blockToSend = + i == mailboxIdToSendMetadata ? block : TransferableBlockUtils.getEndOfStreamTransferableBlock(); + sendBlock(sendingMailbox, blockToSend); } - if (!isEarlyTerminated) { - route(_sendingMailboxes, block); + return false; + } + + assert block.isDataBlock(); + boolean isEarlyTerminated = true; + for (SendingMailbox sendingMailbox : _sendingMailboxes) { + if (!sendingMailbox.isEarlyTerminated()) { + isEarlyTerminated = false; + break; } - return isEarlyTerminated; } + if (!isEarlyTerminated) { + route(_sendingMailboxes, block); + } + return isEarlyTerminated; } protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index f217c9b01ad1..c13d6f6aee9e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.calcite.util.Pair; +import org.apache.commons.collections.MapUtils; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.response.broker.ResultTable; @@ -89,7 +90,7 @@ public QueryDispatcher(MailboxService mailboxService) { } public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, - Map queryOptions, Map executionStatsAggregator) + Map queryOptions, @Nullable Map executionStatsAggregator) throws Exception { long requestId = context.getRequestId(); try { @@ -278,20 +279,16 @@ public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatc } private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats, - @Nullable Map executionStatsAggregatorMap) { - if (executionStatsAggregatorMap != null) { - LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime()); - for (Map.Entry entry : opChainStats.getOperatorStatsMap().entrySet()) { - OperatorStats operatorStats = entry.getValue(); - ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0); - ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId()); - rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + @Nullable Map statsAggregatorMap) { + if (MapUtils.isNotEmpty(statsAggregatorMap)) { + for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) { + ExecutionStatsAggregator rootStatsAggregator = statsAggregatorMap.get(0); + rootStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>()); + ExecutionStatsAggregator stageStatsAggregator = statsAggregatorMap.get(operatorStats.getStageId()); if (stageStatsAggregator != null) { - if (dispatchableSubPlan != null) { - OperatorUtils.recordTableName(operatorStats, - dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId())); - } - stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + OperatorUtils.recordTableName(operatorStats, + dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId())); + stageStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>()); } } }