Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi stage stats #12704

Merged
merged 97 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
c1ed6bb
Make V2 operators use their own logger
gortiz Feb 23, 2024
c907f04
Merge branch 'master' into multi-stage-stats
gortiz Mar 19, 2024
10e7be4
Merge remote-tracking branch 'origin/master' into multi-stage-stats
gortiz Mar 21, 2024
d39ef00
Create StatMaps and use it in V2
gortiz Mar 22, 2024
bb72e72
Merge remote-tracking branch 'origin/master' into multi-stage-stats
gortiz Mar 22, 2024
5db3bb8
Deep modification on multi-stage stats to:
gortiz Mar 26, 2024
724d337
Several changes, still not tested
gortiz Apr 2, 2024
12dcada
rename a variable/attribute
gortiz Apr 3, 2024
e96ac58
Fix checkstyle
gortiz Apr 3, 2024
355732f
Modify MultiStageQueryStats to ignore errors while merging stats
gortiz Apr 3, 2024
298d6f7
Rollback changes in SortOperator
gortiz Apr 3, 2024
a09f6a3
Remove TransferableBlockUtils.getEndOfStreamTransferableBlock() to al…
gortiz Apr 3, 2024
983bfb0
Merge branch 'master' into multi-stage-stats
gortiz Apr 3, 2024
3a36837
Fix some tests
gortiz Apr 3, 2024
bd3727c
Rollback to EOS without stats
gortiz Apr 3, 2024
204398c
fix metadata backward compatibility issue
gortiz Apr 3, 2024
0ab2f02
Add and fix some tests
gortiz Apr 3, 2024
dc7dda8
Keep NUM_GROUPS_LIMIT_REACHED in V1
gortiz Apr 3, 2024
adf22b1
StatMap without boxing
gortiz Apr 4, 2024
1236b1c
Serialize StatMaps as older stats
gortiz Apr 4, 2024
84e8aae
Simplify StatMap by storing data in an EnumMap
gortiz Apr 4, 2024
ce46a3e
Create a stat enum for leaf operator
gortiz Apr 4, 2024
d729651
Do not keep default values in StatMap
gortiz Apr 4, 2024
d21f97f
Fix QueryLoggerTest
gortiz Apr 4, 2024
ccf8294
Fix issue in StatMap.merge(K key, int value)
gortiz Apr 4, 2024
6c979c2
Simplify MultiStageOperator.Type.deserializeStats
gortiz Apr 5, 2024
14ec9c0
Serialize stage stats in the same way StatMap is serialized
gortiz Apr 5, 2024
85dc213
Move custom updateEosBlock from MailboxReceiveOperator to BaseMailbox…
gortiz Apr 5, 2024
e83c197
Improve error detection and report format
gortiz Apr 5, 2024
dec1e22
Fix test
gortiz Apr 5, 2024
14bc3af
Simplify stat relation between MultiStageOperator and BaseMailboxRece…
gortiz Apr 5, 2024
e172555
Fix SortedMailboxReceiveOperator not filling EOS when no row was found
gortiz Apr 5, 2024
3e17e70
Add one stat class per multi-stage operator
gortiz Apr 5, 2024
9c89a23
Fix some errors when multi stage stats were converted into trees
gortiz Apr 5, 2024
8b09a7b
Remove MultiStageOperator.BaseStatKeys
gortiz Apr 5, 2024
09fb937
Merge branch 'master' into multi-stage-stats
gortiz Apr 8, 2024
d6d344c
Fix MIN_CONSUMING_FRESHNESS_TIME_MS merge function
gortiz Apr 8, 2024
5083ec5
Create functions for generic key merges
gortiz Apr 8, 2024
821237f
Fix stats on union
gortiz Apr 8, 2024
115c9d6
Fix QueryLoggerTest
gortiz Apr 8, 2024
c4b9988
concat stats from the current stage in set operations
gortiz Apr 8, 2024
ddb32cc
Merge remote-tracking branch 'master' into multi-stage-stats
gortiz Apr 9, 2024
f34112d
Remove unused import
gortiz Apr 9, 2024
d8ab892
Remove unused line
gortiz Apr 9, 2024
ec11bae
Report THREAD_CPU_TIME_NS and SYSTEM_ACTIVITIES_CPU_TIME_NS in V2
gortiz Apr 9, 2024
0e38d4d
Add new receive, send and join stats
gortiz Apr 10, 2024
4bc476e
Make some stats visible by default in JSON format
gortiz Apr 10, 2024
4a2500a
Change metadata to be stored in metadata section
gortiz Apr 10, 2024
69d6844
Merge remote-tracking branch 'origin/master' into multi-stage-stats
gortiz Apr 10, 2024
974fae2
fix syntactic error.
gortiz Apr 10, 2024
974fccb
fix a test compile issue
gortiz Apr 10, 2024
f166ef2
Fix issue in metadata deserialization
gortiz Apr 11, 2024
2a61f5c
FIx MetadataBlockTest.emptyMetadataBlock
gortiz Apr 11, 2024
8ed9b03
Keep buffer in the expected position when deserializing
gortiz Apr 12, 2024
a089b2b
Add v1 deserialization tests
gortiz Apr 12, 2024
bf2b554
add license headers
gortiz Apr 12, 2024
dcb197c
Fix checkstyle
gortiz Apr 12, 2024
4755cba
Fix tests mocking new required methods
gortiz Apr 12, 2024
ba3a1ce
Fix concurrency issue
gortiz Apr 12, 2024
9c54f4d
remove unused DataBlock.toDataOnlyDataTable
gortiz Apr 15, 2024
caaaf59
Increase protocol version of ColumnarDataBlock and RowDataBlock
gortiz Apr 15, 2024
2b5b827
Add some javadoc
gortiz Apr 15, 2024
131b956
Improve code style
gortiz Apr 15, 2024
09f9a21
Small changes in code style
gortiz Apr 15, 2024
d719a3f
Rename `addStageStat` to `addStageStats`
gortiz Apr 15, 2024
9329a8a
Remove TODO
gortiz Apr 15, 2024
4349d3b
Add javadoc
gortiz Apr 15, 2024
a582c8a
Remove OperatorStats
gortiz Apr 15, 2024
f91e263
Remove generic from MultiStageOperator
gortiz Apr 15, 2024
8cde598
Rename method
gortiz Apr 15, 2024
4131f10
Remove blank line
gortiz Apr 15, 2024
550329b
Remove test
gortiz Apr 15, 2024
da15044
Update testQueryTestCasesWithMetadata test
gortiz Apr 15, 2024
ebc24f3
Merge remote-tracking branch 'origin/master' into multi-stage-stats
gortiz Apr 15, 2024
8afef13
Add comments in recursive case
gortiz Apr 23, 2024
600a547
Modify recursiveCase to do not throw even if there is a mismatch in t…
gortiz Apr 23, 2024
3bdcedb
Remove nullable annotation in TransferableBlock constructor
gortiz Apr 23, 2024
66bd9ba
Remove assertion trick in MultiStageQueryStats
gortiz Apr 23, 2024
2d55d24
Make BrokerResponseNativeV2 do not extends BrokerResponseNative
gortiz Apr 24, 2024
47ae138
Merge remote-tracking branch 'master' into multi-stage-stats
gortiz Apr 24, 2024
8327156
add comment
gortiz Apr 24, 2024
a3d44d2
Properly serialize exceptions in BrokerResponseNativeV2
gortiz Apr 25, 2024
bc58f63
Fix some json issues in BrokerResponseNative and v2
gortiz Apr 25, 2024
bd7afc1
Merge remote-tracking branch 'master' into multi-stage-stats
gortiz Apr 26, 2024
b1bef34
Change stageStats to be a tree instead of an array
gortiz Apr 26, 2024
dc4db11
Remove `shouldCollectStats` on V2 operators
gortiz Apr 26, 2024
39ac9eb
Show `maxRowsInOperator` in Pinot UI
gortiz Apr 26, 2024
0193a0d
Make `maxRowsInOperator` read only in Jackson
gortiz Apr 26, 2024
2d96491
remove isPartialResult
gortiz Apr 29, 2024
c229fbf
Remove StatMap.Type.STRING and make MetadataKey do not implement Stat…
gortiz Apr 29, 2024
12059aa
Add StatMap.Type.STRING again
gortiz Apr 29, 2024
746fe2d
Fix bug that reported hash stats in the inverse order
gortiz Apr 29, 2024
c8468e1
Fix children array in json stats being listed in inverse order
gortiz Apr 29, 2024
02ef0b2
Remove getMaxRowsInOperator in BrokerResponse and Pinot UI
gortiz Apr 30, 2024
8bebbfc
Merge remote-tracking branch master into multi-stage-stats
gortiz Apr 30, 2024
b8334a9
Add more backward compatibility tests
gortiz May 1, 2024
fae4984
do not include metadata type in the serialization.
gortiz May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
realtimeRoutingTable = null;
}
}
BrokerResponseNative brokerResponse;
BrokerResponse brokerResponse;
if (_queriesById != null) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
// potential failures that could happen before sending it out, like failures to calculate the routing table etc.
Expand Down Expand Up @@ -798,7 +798,6 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
1);
}
brokerResponse.setPartialResult(isPartialResult(brokerResponse));
gortiz marked this conversation as resolved.
Show resolved Hide resolved

// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
Expand Down Expand Up @@ -1870,19 +1869,14 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t
* Processes the optimized broker requests for both OFFLINE and REALTIME table.
* TODO: Directly take PinotQuery
*/
protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
protected abstract BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception;

protected static boolean isPartialResult(BrokerResponse brokerResponse) {
return brokerResponse.isNumGroupsLimitReached() || brokerResponse.isMaxRowsInJoinReached()
|| brokerResponse.getExceptionsSize() > 0;
}

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setTotalDocs(response.getTotalDocs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
Expand All @@ -48,7 +47,6 @@
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.BrokerResponseStats;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -58,14 +56,17 @@
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
Expand Down Expand Up @@ -195,24 +196,12 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
}

Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap;
if (!traceEnabled) {
stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false));
} else {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages);
for (int stageId = 0; stageId < numStages; stageId++) {
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true));
}
}

long executionStartTimeNs = System.nanoTime();
ResultTable queryResults;
QueryDispatcher.QueryResult queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
queryResults =
_queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions);
} catch (TimeoutException e) {
for (String table : tableNames) {
_brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
Expand All @@ -231,7 +220,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs);

BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults);
brokerResponse.setResultTable(queryResults.getResultTable());

// Attach unavailable segments
int numUnavailableSegments = 0;
Expand All @@ -243,35 +232,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
String.format("Find unavailable segments: %s for table: %s", unavailableSegments, tableName)));
}

for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
if (entry.getKey() == 0) {
// Root stats are aggregated and added separately to broker response for backward compatibility
entry.getValue().setStats(brokerResponse);
continue;
}

BrokerResponseStats brokerResponseStats = new BrokerResponseStats();
if (!tableNames.isEmpty()) {
//TODO: Only using first table to assign broker metrics
// find a way to split metrics in case of multiple table
String rawTableName = TableNameBuilder.extractRawTableName(tableNames.iterator().next());
entry.getValue().setStageLevelStats(rawTableName, brokerResponseStats, _brokerMetrics);
} else {
entry.getValue().setStageLevelStats(null, brokerResponseStats, null);
}
brokerResponse.addStageStat(entry.getKey(), brokerResponseStats);
}

// Set partial result flag
brokerResponse.setPartialResult(isPartialResult(brokerResponse));
fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(), dispatchableSubPlan);

// Set total query processing time
// TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
requestContext.setQueryProcessingTime(totalTimeMs);
requestContext.setTraceInfo(brokerResponse.getTraceInfo());
augmentStatistics(requestContext, brokerResponse);

// Log query and stats
Expand All @@ -282,6 +250,22 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return brokerResponse;
}

private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse,
List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) {
List<PlanNode> planNodes = dispatchableSubPlan.getQueryStageList().stream()
.map(DispatchablePlanFragment::getPlanFragment)
.map(PlanFragment::getFragmentRoot)
.collect(Collectors.toList());
MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(planNodes, queryStats);
brokerResponse.setStageStats(treeBuilder.jsonStatsByStage(0));

for (MultiStageQueryStats.StageStats.Closed stageStats : queryStats) {
if (stageStats != null) { // for example pipeline breaker may not have stats
stageStats.forEach((type, stats) -> type.mergeInto(brokerResponse, stats));
}
}
}

/**
* Validates whether the requester has access to all the tables.
*/
Expand Down Expand Up @@ -325,7 +309,7 @@ private void updatePhaseTimingForTables(Set<String> tableNames, BrokerQueryPhase
}
}

private BrokerResponseNative constructMultistageExplainPlan(String sql, String plan) {
private BrokerResponse constructMultistageExplainPlan(String sql, String plan) {
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
List<Object[]> rows = new ArrayList<>();
rows.add(new Object[]{sql, plan});
Expand All @@ -336,7 +320,7 @@ private BrokerResponseNative constructMultistageExplainPlan(String sql, String p
}

@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.HashUtil;
Expand Down Expand Up @@ -100,7 +101,7 @@ public synchronized void shutDown() {
}

@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public abstract class BaseDataBlock implements DataBlock {
protected ByteBuffer _fixedSizeData;
protected byte[] _variableSizeDataBytes;
protected ByteBuffer _variableSizeData;
protected Map<String, String> _metadata;

/**
* construct a base data block.
Expand All @@ -114,7 +113,6 @@ public BaseDataBlock(int numRows, @Nullable DataSchema dataSchema, String[] stri
_fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
_variableSizeDataBytes = variableSizeDataBytes;
_variableSizeData = ByteBuffer.wrap(variableSizeDataBytes);
_metadata = new HashMap<>();
_errCodeToExceptionMap = new HashMap<>();
}

Expand All @@ -131,7 +129,6 @@ public BaseDataBlock() {
_fixedSizeData = null;
_variableSizeDataBytes = null;
_variableSizeData = null;
_metadata = new HashMap<>();
_errCodeToExceptionMap = new HashMap<>();
}

Expand Down Expand Up @@ -195,10 +192,7 @@ public BaseDataBlock(ByteBuffer byteBuffer)
_variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);

// Read metadata.
int metadataLength = byteBuffer.getInt();
if (metadataLength != 0) {
_metadata = deserializeMetadata(byteBuffer);
}
deserializeMetadata(byteBuffer);
}

@Override
Expand Down Expand Up @@ -232,7 +226,7 @@ public int getVersion() {

@Override
public Map<String, String> getMetadata() {
return _metadata;
return Collections.emptyMap();
}

@Override
Expand Down Expand Up @@ -432,6 +426,11 @@ public Map<Integer, String> getExceptions() {
return _errCodeToExceptionMap;
}

/**
* Serialize this data block to a byte array.
* <p>
* In order to deserialize it, {@link DataBlockUtils#getDataBlock(ByteBuffer)} should be used.
*/
@Override
public byte[] toBytes()
throws IOException {
Expand All @@ -444,9 +443,7 @@ public byte[] toBytes()
// Write metadata: length followed by actual metadata bytes.
// NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while
// considering it will bring a lot code complexity.
byte[] metadataBytes = serializeMetadata();
dataOutputStream.writeInt(metadataBytes.length);
dataOutputStream.write(metadataBytes);
serializeMetadata(dataOutputStream);

return byteArrayOutputStream.toByteArray();
}
Expand Down Expand Up @@ -525,14 +522,30 @@ private void writeLeadingSections(DataOutputStream dataOutputStream)
}
}

private byte[] serializeMetadata()
/**
* Writes the metadata section to the given data output stream.
*/
protected void serializeMetadata(DataOutputStream dataOutputStream)
throws IOException {
return new byte[0];
dataOutputStream.writeInt(0);
}

private Map<String, String> deserializeMetadata(ByteBuffer buffer)
/**
* Deserializes the metadata section from the given byte buffer.
* <p>
* This is the counterpart of {@link #serializeMetadata(DataOutputStream)} and it is guaranteed that the buffer will
* be positioned at the start of the metadata section when this method is called.
* <p>
* <strong>Important:</strong> It is mandatory for implementations to leave the cursor at the end of the metadata, in
* the exact same position as it was when {@link #serializeMetadata(DataOutputStream)} was called.
* <p>
* <strong>Important:</strong> This method will be called at the end of the BaseDataConstructor constructor to read
* the metadata section. This means that it will be called <strong>before</strong> the subclass have been constructor
* have been called. Therefore it is not possible to use any subclass fields in this method.
*/
protected void deserializeMetadata(ByteBuffer buffer)
throws IOException {
return Collections.emptyMap();
buffer.getInt();
}

private byte[] serializeExceptions()
Expand Down Expand Up @@ -572,14 +585,9 @@ private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
@Override
public String toString() {
if (_dataSchema == null) {
return _metadata.toString();
return "{}";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have the ability to decode what is in metadata now.

} else {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("resultSchema:").append('\n');
stringBuilder.append(_dataSchema).append('\n');
stringBuilder.append("numRows: ").append(_numRows).append('\n');
stringBuilder.append("metadata: ").append(_metadata.toString()).append('\n');
return stringBuilder.toString();
return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + _numRows + '\n';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import org.apache.pinot.common.utils.DataSchema;


/**
* Column-wise data table. It stores data in columnar-major format.
*/
public class ColumnarDataBlock extends BaseDataBlock {
private static final int VERSION = 1;
private static final int VERSION = 2;
protected int[] _cumulativeColumnOffsetSizeInBytes;
protected int[] _columnSizeInBytes;

Expand Down Expand Up @@ -80,17 +82,21 @@ protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
}

@Override
public ColumnarDataBlock toMetadataOnlyDataTable() {
ColumnarDataBlock metadataOnlyDataTable = new ColumnarDataBlock();
metadataOnlyDataTable._metadata.putAll(_metadata);
metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
return metadataOnlyDataTable;
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ColumnarDataBlock)) {
return false;
}
ColumnarDataBlock that = (ColumnarDataBlock) o;
return Objects.deepEquals(_cumulativeColumnOffsetSizeInBytes, that._cumulativeColumnOffsetSizeInBytes)
&& Objects.deepEquals(_columnSizeInBytes, that._columnSizeInBytes);
}

@Override
public ColumnarDataBlock toDataOnlyDataTable() {
return new ColumnarDataBlock(_numRows, _dataSchema, _stringDictionary, _fixedSizeDataBytes, _variableSizeDataBytes);
public int hashCode() {
return Objects.hash(Arrays.hashCode(_cumulativeColumnOffsetSizeInBytes), Arrays.hashCode(_columnSizeInBytes));
}

// TODO: add whole-column access methods.
// TODO: add whole-column access methods.
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ byte[] toBytes()
@Nullable
RoaringBitmap getNullRowIds(int colId);

DataBlock toMetadataOnlyDataTable();

DataBlock toDataOnlyDataTable();

enum Type {
ROW(0),
COLUMNAR(1),
Expand Down
Loading
Loading