-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi stage stats #12704
Multi stage stats #12704
Conversation
- always send stats through endOfStream blocks - Aggregate stats on top of a StageStatsHolder
private static byte[] serializeStats(List<ByteBuffer> stats) { | ||
try (UnsynchronizedByteArrayOutputStream baos = new UnsynchronizedByteArrayOutputStream(1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A future PR should change BaseDataBlock to do not ask for byte[]
but ByteBuffer
so we won't need to do an extra heap copy of the content.
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Outdated
Show resolved
Hide resolved
…ways include stats with EOS. Production code that used it now calls the new method TransferableBlockUtils.getEndOfStreamTransferableBlock(stageId) Test code that called that method now calls TransferableBlockTestUtils.getEndOfStreamTransferableBlock(), which simply call TransferableBlockUtils.getEndOfStreamTransferableBlock(0)
if (tableType == TableType.OFFLINE) { | ||
_offlineThreadCpuTimeNs += threadCpuTimeNs; | ||
_offlineSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs; | ||
_offlineResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs; | ||
} else { | ||
_realtimeThreadCpuTimeNs += threadCpuTimeNs; | ||
_realtimeSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs; | ||
_realtimeResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct because tableName never contains the _OFFLINE or _REALTIME suffix, so these stats are always counted as realtime.
Right now as far as I can test, these stats are not being filled in V2. @Jackie-Jiang do you think we should just remove this or always count the times in either offline or realtime as we are doing right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I guess this is an example where we cannot directly expose V1 stats as V2 stats even for leaf stage. V1 server request only touch one table (either OFFLINE or REALTIME), but V2 leaf stage can touch 1 or 2 tables. To solve this, I think we should read the V1 stats in leaf stage, and convert them into leaf stage stats there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think I misunderstood the issue, and we are already reposting the stats in leaf stage operator. And now I understand better why we need to make V1 stats also implementing StatMap.Key
.
I guess we can just leave them as 0. People can lookup the V2 stats to get the details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not V1 or V2 stats. They broker stats. They are populated in the broker code.
V1 collects threadCpuTimeNs
, systemActivitiesCpuTimeNs
and responseSerializationCpuTimeNs
but doesn't care whether they are realtime or offline. That information reaches the broker and depending on the table type, it stores the value in the realtime or offline version and that is what is returned to the customer.
In the code we have in master right now these stats are not being populated when the query is executed in V2. Specifically in ExecutionStatsAggregator
, the code:
TableType tableType = null;
String instanceName = null;
if (routingInstance != null) {
tableType = routingInstance.getTableType();
instanceName = routingInstance.getShortName();;
} else if (tableName != null) {
tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
instanceName = tableName;
} else {
tableType = null;
instanceName = null;
}
Ends up setting tableType
to null, these stats are never calculated.
I guess we can remove these lines and keep the current behavior. We can try to add support for this in the future.
If found it in dcb197c, whose tests are still in https://github.com/apache/pinot/actions/runs/8659408355/job/23754450579. But previous commits failed in other cases. Basically it looks like there is a concurrency issue if we remove the Synchronized map in I have to say that in that version of the code the StatMap was initialized in the super class, so maybe the constructor with final attribute was not applying by then. Now that it applies, it may be the case that we don't need to actually synchronize, but I wouldn't feel very comfortable relaying on that. |
Regarding the race condition, it might be related to the early termination. I'll also look more into it, but I remember seeing similar issues for other classes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reviewed most of the high level flows and it looks very promising.
Is it possible to break this PR into smaller ones? Currently it covers too many topics and is hard to track all the changes
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
Outdated
Show resolved
Hide resolved
return _type; | ||
} | ||
|
||
public void updateV1Metadata(StatMap<DataTable.MetadataKey> oldMetadata, StatMap<StatKey> stats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we maintained a map from v2 key to v1 key, and use v2 stats to fill the v1 stats, then use v1 stats to fill the broker response. IMO this will make future decoupling of v1 and v2 hard.
Does it work if we do not fill the v1 stats, but directly use v2 stats to fill the broker response? I'm not sure if you did this to simplify the broker response V1 handling, but since we already decoupled broker response v1 and v2, should we also decouple this?
The v1 stats key to v2 mapping is done through the switch case in mergeExecutionStats()
. If we remove the mapping from v2 to v1, can we make v1 stats not implementing StatMap.Key
? The overall goal here is to decouple the v2 and v1 handling so that in the future we can remove v1 easier if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stats returned by BrokerResponseNative
are based on the stats in MetadataKey
and extend them with some extra stats not present in there. But instead of using the map from MetadataKey
to values we already have, BrokerResponseNative
stores all of them in specific attributes, which makes BrokerResponseNative
a bit difficult to understand, specially to understand which attributes are unique to BrokerResponseNative
and which are just a copy of some MetadataKey
.
In master, BrokerResponseNativeV2
extends BrokerResponseNative
, so it inherits the same pattern. This PR is improving the situation by storing a StatMap
with the MetadataKey
values. Now it is easy to see which values are just MetadataKey
and which ones are actual new concepts added in the broker.
If we remove the mapping from v2 to v1, can we make v1 stats not implementing StatMap.Key?
This is not the first time you ask about that. My answer is: Why is that a target? What is the issue of MetadataKey
implementing StatMap.Key
? To me it looks like an improvement. Now, for example, MetadataKey
s can define how are they merged in a nice way. StatMap
doesn't have to be limited to store the V2 stats but any stat like feature in a efficient way.
Does it work if we do not fill the v1 stats, but directly use v2 stats to fill the broker response?
There is no V2 stat that can assume this role. Because BrokerResponse
stats are showing an aggregated view of the query stats, while V2 stats are operation base. For example, in a query we may have 2 different aggregations. One of them may have reached its num groups limit and the other may not.
In that case, there would be a single value for BrokerResponse.isNumGroupsLimitReached
, which will be true. But from the V2 stats point of view there are two instances of a Map<LeafStageTransferableBlockOperator.StatKey>
(or Map<AggregateOperator.StatKey>
). One of them will say that the limit has being reached and the other will say the opposite. In order to generate the actual BrokerResponse
we need to merge both.
Then we would need to create a new StatKey for BrokerResponse and that new enum would end up being almost exactly equal to MetadataKey
.
Could we implement BrokerNativeResponseV2
without using a StatMap
? Yes, we could, but then we would need to add a lot of logic. We already have that in ExecutionStatsAggregator.setStats
for V1 and we already had that for V2. Now instead of having at least two almost identical large method merging the stats, we have that merge logic defined in the stats, so it is more difficult to mess it up. These large methods are problematic because they are are mostly adding stats, but sometimes the merge logic is different and we need to be aware of that each time we merge the stats. With StatKey
we end up having smaller methods which call StatKey.merge
which is where the actual logic is defined.
The overall goal here is to decouple the v2 and v1 handling so that in the future we can remove v1 easier if necessary.
This PR doesn't make more difficult to do so, in fact it makes it easier. Apart from collecting stats in V1, the main blocker to remove MetadataKey
(the V1 stats) is that BrokerResponse
and MetadataKey
are very coupled.
The idea of mapping V2 stats into V1 is there just to make it easier to return them in BrokerResponse
. If in the future we want to remove MetadataKey
, we can just create a new class that implements StatKey
and map from V2 to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to explain why I don't like having MetadataKey
implementing StatMap.Key
. To me MetadataKey
is the stats key for v1
engine, and some stats don't apply to v2
engine. Thus I would model StatMap.Key
as the stats key for v2
engine, which is logically separated from v1
engine stats. Making MetadataKey
implementing StatMap.Key
is essentially declaring v1
engine stats part of v2
stats, which is a coupling of v1
and v2
.
From implementation perspective, based on my code reading (correct me if I missed some important steps), for multi-stage engine, on the broker side, we only get the stats for leaf stage operator. Then we convert the v2 stats into v1 stats (MultiStageOperator.Type.LEAF.mergeInto()
), then finally call BrokerResponseNativeV2.addServerStats()
to add v1 stats into v2 response. I don't see other usage of MetadataKey
within v2 engine flow, and I don't really see the benefits of this back and forth conversion. If all the v1 stats are from v2 leaf stage stats, we should be able to directly use leaf stage stats to serve the stats within v2 response.
Let's bring this offline to ensure we are on the same page and I'm not missing important point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thus I would model StatMap.Key as the stats key for v2 engine, which is logically separated from v1 engine stats.
I think that is a difference we have in our models. In my model StatMap.Key
is not related to V2 engine. It is just an efficient map from enums to primitives that we designed to be used in V2, but could be used in other places.
From implementation perspective, based on my code reading (correct me if I missed some important steps), for multi-stage engine, on the broker side, we only get the stats for leaf stage operator.
I think the correct formulation would be for multi-stage engine, on the broker side, we mostly get the stats for leaf stage operator. There are some metrics, like maxRowsInOperator
, maxRowsInJoinReached
or numGroupsLimitReached
that are collected from other multi-stage operators different than leaf stage. For example BrokerNative.numGroupsLimitReached
is true if any aggregation in the tree has reached the group limits. In V1 that is the only info we have. In V2 we can iterate over statsMap
and get which specific group by is the one that reached the limit.
Anyway, yesterday I modified the code so MetadataKey does not implement StatMap.Key
. Instead I created a new enum with broker stats.
case UNKNOWN: | ||
LOGGER.debug("Skipping unknown execution stat: {}", entry.getKey()); | ||
break; | ||
case TABLE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need table? Seems we are simply overriding it?
I think we can remove it from v2 stats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After removing TABLE
and OPERATOR_ID
, we might be able to remove STRING
type. string stats doesn't make much sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this stat is very useful to indicate the table in the json stats returned to the client. Specifically, in the json that correspond to the leaf stage. Right now it is very difficult to map from stages to the original SQL and this is one of the few hints we have to understand how stages are being generated. See for example the stat map in the description of this PR. Without the table
attribute it is very difficult to map from these stats to the original SQL and therefore understand how to improve your query.
Said that, we can do that without adding table as an stat. We can add add the table json field on MultiStageBrokerRequestHandler
when the json is being created. I've pushed a change that does that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, it doesn't seem easy to add the table when the stat json is being build. Specifically in cases like the one where we have a pipeline breaker (semi join) the logical plan is quite different to the physical plan which makes it very difficult to infer the name of the table from the logical plan.
Even it is a bit hackish, I'm going to keep it the table as a stat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how these 2 stats are used? I don't see them used in the broker response, and seems the merge rule is overriding, meaning if we gathered 2 tables/operatorIds, the second one will override the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed OPERATOR_ID because there was no use for it.
Right now the TABLE stat is used in MultiStageBrokerRequestHandler
to build the stageStats
json object (the one that is shown in the description of this PR). There we use the Table stat to introduce the name of the table scanned when the json node of the leaf stage is created. In theory we don't need this here, we can infer the name of the table from the PlanNode, but there are cases where the PlanNode tree is quite different to the actual physical plan, so it is more difficult to do than expected.
There was another place where it was used, in ResourceBasedQueriesTest.testQueryTestCasesWithMetadata
, but I ended up finding a way to do not need it. The previous version of the code was honestly simpler, but I guess the new version is better because ideally we would like to remove the TABLE stat.
throws IOException { | ||
switch (version) { | ||
case 1: { | ||
V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still confused. Is this the only place where v1 metadata block is used? Does it handle error differently from v2 metadata block?
Does it work if we ignore the version and always return new MetadataBlock(byteBuffer);
?
If the error handling is different, then sending v2 metadata block to old server will cause backward incompatibility right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the only place where v1 metadata block is used?
Yes. It is used only here and in the tests.
Does it work if we ignore the version and always return new MetadataBlock(byteBuffer);?
Yes. No, see the next comemnt
If the error handling is different, then sending v2 metadata block to old server will cause backward incompatibility right?
It is no different.
Remember that the main reason to have this class is to be able to run tests with it. The fact that it is useful here is just an extra.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I planned to move V1MetadataBlock to the test classes, but I found some issues and I think the correct thing is to keep them here.
It looks like we cannot always return new Metadata(byteBuffer)
because errors are encoded in a different way. Specifically, the metadata type (whether it is a eos or an error) was encoded in the variable part in V1 and in the fixed part in V2.
The change is still forward compatible (V1 can read V2) and with these lines, V2 can also read V1. I've created more test to verify that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we are encoding errors differently in v1 and v2, how does v1 understand the errors in v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's list down the behavior on reading v1 bytes as v2 and also reading v2 bytes as v1 so that user knows what to expect when upgrading.
With current way of keeping both versions, we don't need to read v1 bytes as v2, but we can still read v2 bytes as v1 block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we are encoding errors differently in v1 and v2, how does v1 understand the errors in v2?
In V1 the quality of being an error is reported in two different ways:
- By having a map of exceptions, which is serialized in a specific byte buffer section.
- By setting the
type
in the jsonContents
stored in the variable size section.
In V2 we just use the first option. I that is what the very old V0 (whose source code is in MetadataBlockTest.V0MetadataBlock
) in version was doing. Given V1 can read V0, it works as expected.
Specifically, V1 is able to read bytes where the variable section is empty or Contents.type
is not defined because getType
is defined as:
public MetadataBlock.MetadataBlockType getType() {
String type = _contents.getType();
// if type is null, then we're reading a legacy block where we didn't encode any
// data. assume that it is an EOS block if there's no exceptions and an ERROR block
// otherwise
return type == null
? (getExceptions().isEmpty() ? MetadataBlock.MetadataBlockType.EOS : MetadataBlock.MetadataBlockType.ERROR)
: MetadataBlock.MetadataBlockType.valueOf(type);
}
Given we don't set Contents.getType
, its value is null and we return EOS or ERROR depending on whether exceptions is empty or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's list down the behavior on reading v1 bytes as v2 and also reading v2 bytes as v1 so that user knows what to expect when upgrading.
The compatibility matrix is verified by tests. I've just renamed the name of the tests and I'm adding here the test table to show which tests verifies each scenario:
input version | input type | input content | possible? | read in version | output type | output content | tested by |
---|---|---|---|---|---|---|---|
v1 | eos | empty | yes | v2 | eos | empty | v1EosWithStatsIsDecodedAsV2EosWithoutStats |
v1 | eos | not empty | yes | v2 | eos | empty | v1EosWithoutStatsIsDecodedAsV2EosWithoutStats |
v1 | error | empty | no | - | - | - | - |
v1 | error | not empty | yes | v2 | error | same as input | v1ErrorIsDecodedAsV2ErrorWithSameExceptions |
v2 | eos | empty | yes | v1 | eos | empty | v2EosWithoutStatsIsReadInV1AsEosWithoutStats |
v2 | eos | not empty | yes | v1 | eos | empty | v2EosWithStatsIsReadInV1AsEosWithoutStats |
v2 | error | empty | no | - | - | - | - |
v2 | error | not empty | yes | v1 | error | same as input | v2ErrorIsReadInV1AsErrorWithSameExceptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the above chart, do we need to have the special handling here?
This is a non-blocking comment, we may iterate on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we remove this case, then v1ErrorWithExceptionsIsDecodedAsV2ErrorWithSameExceptions
fails because we decode as EOS instead of error.
The reason for that is because when I modified MetadataBlock
I decided to keep the _type
attribute. But I guess it doesn't make much sense if we define that an error block must always contain at least one exception (like in V0). I've just changed that and now all tests are passing and V1MetadataBlock can be moved to the test directory.
Anyway, one of my plans is to clean this code a lot in order to improve serialization/deserialization times.
I can try, but I think most of the topics are related. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
We can iterate on the following 2 things separately:
- Race condition in StatsMap
- Whether we need to keep V1 metadata block in production code
throws IOException { | ||
switch (version) { | ||
case 1: { | ||
V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the above chart, do we need to have the special handling here?
This is a non-blocking comment, we may iterate on this
At this moment this PR is a draft where I'm trying to modify the way stats are collected and encoded in multi-stage engine.
The plan is to:
Current state:
Changes
Stats added
Stats that are populated now
threadCpuTimeNs
andsystemActivitiesCpuTimeNs
, which were set inInstanceResponseOperator
but only for single stage. NowStreamingInstanceResponseOperator
set these properties as well.Stats that are not populated
nettyConnectionSendResponseLatency
: Doesn't make sense, given we use GRPC instead of NettyrequestDeserialization
: It seems difficult to measure by table time right nowresponseSerialization
: It seems difficult to measure by table time right nowschedulerWait
: It seems difficult to measure by table right nowBroker stats added
A new stat called
maxRowsInOperator
has been added.This stat contains the max number of documents emitted by a single logical operator.
This is shown in the Pinot Console UI and it can be useful in cases where joins generate a lot of rows
Multi-stage uses this new API
In multi-stage, stats are always send in EOS
Before this change, each stage kept the stats in the context and they were only sent in the EOS block on mailbox send operators.
Stats are not serialized/deserialized in the same stage. They are also not serialized when traveling from one stage to another if they are in the same server. A future optimization would be to prioritize local receivers in send mailbox.
In multi-stage, stats are shown in tree format
For example, the query:
Generates the following json with and without tracing (click to see):