Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi stage stats #12704

Merged
merged 97 commits into from
May 3, 2024
Merged

Multi stage stats #12704

merged 97 commits into from
May 3, 2024

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Mar 22, 2024

At this moment this PR is a draft where I'm trying to modify the way stats are collected and encoded in multi-stage engine.

The plan is to:

  1. Use enums as stat keys
  2. Collect stats per stage.
    • Each OpChain will collect a partial view for its stage and the stages upstream.
    • OpChains that receive data from more than one OpChain of the same stage will merge them.
    • The root stage (on the broker), where there is only one OpChain, will collect the full view.
  3. Change the way stats are encoded. Instead of JSON we will use a binary format equal or more effective than the one used in DataTable.
  4. Create new stats for specific multi-stage operators where it makes sense
  5. Export these stats to broker clients

Current state:

  • Define the StatMap that will map from enums to values.
  • Define the encoding
  • Define the StageStats
  • Substitute OpChainStats with StageStats in OpChain
  • Merge StageStats in receiving mailboxes.
  • Return the StageStats to broker clients
  • Create new stats for specific multi-stage operators where it makes sense
  • Pass all tests
  • Create new tests
  • Document new stats

Changes

Stats added

  • All stages:
    • executionTimeMs (long): clock time (in ms) spent executing this operation, including their upstreams.
    • emittedRows (long): number of rows emitted by this operation.
  • AGGREGATE:
    • numGroupsLimitReached (bool): if true, we have reached the max number of groups.
  • MAILBOX_RECEIVE:
    • fanIn (int): How many send mailboxes are being read by this receive operator.
    • inMemoryMessages (int): How many messages have been received in heap format by this mailbox.
    • rawMessages (int): How many messages have been received in raw format and therefore deserialized by this mailbox.
    • deserializedBytes (long): How many bytes have been deserialized by this mailbox.
    • deserializationTimeMs (long): How long (in CPU time) it took to deserialize the raw messages received by this mailbox.
    • downstreamWaitMs (long): How long (in CPU time) it took to offer the messages to downstream operator.
    • upstreamWaitMs (long): How long (in CPU time) it took to wait for the messages to be offered to downstream operator.
  • JOIN:
    • maxRowsInJoinReached (bool): if true, we have reached the max number of rows in join.
    • timeBuildingHashTableMs (long): How long (CPU time) has been spent on building the hash table.
  • MAILBOX_SEND:
    • stage (int): The id of the stage whose root is this operator.
    • fanOut (int): How many receive mailboxes are being written by this send operator.
    • inMemoryMessages (int): How many messages have been sent in heap format by this mailbox.
    • rawMessages (int): How many messages have been sent in raw format and therefore serialized by this mailbox.
    • serializedBytes (long): How many bytes have been serialized by this mailbox.
    • serializationTime (long): How long (in CPU time) it took to serialize the raw messages sent by this mailbox.

Stats that are populated now

  • threadCpuTimeNs and systemActivitiesCpuTimeNs, which were set in InstanceResponseOperator but only for single stage. Now StreamingInstanceResponseOperator set these properties as well.

Stats that are not populated

  • nettyConnectionSendResponseLatency: Doesn't make sense, given we use GRPC instead of Netty
  • requestDeserialization: It seems difficult to measure by table time right now
  • responseSerialization: It seems difficult to measure by table time right now
  • schedulerWait: It seems difficult to measure by table right now

Broker stats added

A new stat called maxRowsInOperator has been added.
This stat contains the max number of documents emitted by a single logical operator.
This is shown in the Pinot Console UI and it can be useful in cases where joins generate a lot of rows

image

Multi-stage uses this new API

In multi-stage, stats are always send in EOS

Before this change, each stage kept the stats in the context and they were only sent in the EOS block on mailbox send operators.

Stats are not serialized/deserialized in the same stage. They are also not serialized when traveling from one stage to another if they are in the same server. A future optimization would be to prioritize local receivers in send mailbox.

In multi-stage, stats are shown in tree format

For example, the query:

SELECT 
	/*+  joinOptions(max_rows_in_join='10000000') */ 
	1
FROM baseballStats_OFFLINE as a2
  JOIN (
	select 
	* from
      baseballStats_OFFLINE AS a1
      JOIN dimBaseballTeams_OFFLINE AS b 
	  --on a1.teamID = b.teamID
	  on 1 = 1
  ) as c on a2.teamID = c.playerID
Generates the following json with and without tracing (click to see):
{
   "resultTable": {...},
    "rows": [...]
  },
  "requestId": "...",
  "stageStats": {
    "type": "MAILBOX_RECEIVE",
    "executionTimeMs": 342,
    "fanIn": 3,
    "rawMessages": 3,
    "deserializedBytes": 1668,
    "upstreamWaitMs": 1029,
    "children": [
      {
        "type": "MAILBOX_SEND",
        "executionTimeMs": 1022,
        "stage": 1,
        "parallelism": 3,
        "fanOut": 1,
        "rawMessages": 3,
        "children": [
          {
            "type": "TRANSFORM",
            "executionTimeMs": 1022,
            "children": [
              {
                "type": "HASH_JOIN",
                "executionTimeMs": 1022,
                "timeBuildingHashTableMs": 1002,
                "children": [
                  {
                    "type": "MAILBOX_RECEIVE",
                    "executionTimeMs": 21,
                    "emittedRows": 97889,
                    "fanIn": 1,
                    "inMemoryMessages": 11,
                    "rawMessages": 22,
                    "deserializedBytes": 287151,
                    "downstreamWaitMs": 960,
                    "upstreamWaitMs": 1030,
                    "children": [
                      {
                        "type": "MAILBOX_SEND",
                        "executionTimeMs": 883,
                        "emittedRows": 4992339,
                        "stage": 3,
                        "parallelism": 3,
                        "fanOut": 3,
                        "inMemoryMessages": 13,
                        "rawMessages": 25,
                        "serializedBytes": 13541990,
                        "serializationTimeMs": 167,
                        "children": [
                          {
                            "type": "TRANSFORM",
                            "executionTimeMs": 619,
                            "emittedRows": 4992339,
                            "children": [
                              {
                                "type": "HASH_JOIN",
                                "executionTimeMs": 469,
                                "emittedRows": 4992339,
                                "timeBuildingHashTableMs": 6,
                                "children": [
                                  {
                                    "type": "MAILBOX_RECEIVE",
                                    "executionTimeMs": 10,
                                    "emittedRows": 97889,
                                    "fanIn": 1,
                                    "inMemoryMessages": 4,
                                    "rawMessages": 9,
                                    "deserializedBytes": 448569,
                                    "deserializationTimeMs": 1,
                                    "upstreamWaitMs": 41,
                                    "children": [
                                      {
                                        "type": "MAILBOX_SEND",
                                        "emittedRows": 51,
                                        "stage": 5,
                                        "parallelism": 1,
                                        "fanOut": 3,
                                        "inMemoryMessages": 2,
                                        "rawMessages": 4,
                                        "serializedBytes": 657,
                                        "children": [
                                          {
                                            "type": "LEAF",
                                            "table": "dimBaseballTeams_OFFLINE",
                                            "executionTimeMs": 1,
                                            "emittedRows": 51,
                                            "numDocsScanned": 51,
                                            "numSegmentsQueried": 1,
                                            "numSegmentsProcessed": 1,
                                            "numSegmentsMatched": 1,
                                            "totalDocs": 51,
                                            "threadCpuTimeNs": 111660
                                          }
                                        ]
                                      }
                                    ]
                                  },
                                  {
                                    "type": "MAILBOX_RECEIVE",
                                    "executionTimeMs": 5,
                                    "emittedRows": 153,
                                    "fanIn": 1,
                                    "inMemoryMessages": 2,
                                    "rawMessages": 4,
                                    "deserializedBytes": 870,
                                    "upstreamWaitMs": 6,
                                    "children": [
                                      {
                                        "type": "MAILBOX_SEND",
                                        "executionTimeMs": 8,
                                        "emittedRows": 97889,
                                        "stage": 4,
                                        "parallelism": 1,
                                        "fanOut": 3,
                                        "inMemoryMessages": 3,
                                        "rawMessages": 9,
                                        "serializedBytes": 448333,
                                        "serializationTimeMs": 8,
                                        "children": [
                                          {
                                            "type": "LEAF",
                                            "table": "baseballStats_OFFLINE",
                                            "executionTimeMs": 14,
                                            "emittedRows": 97889,
                                            "numDocsScanned": 97889,
                                            "numEntriesScannedPostFilter": 97889,
                                            "numSegmentsQueried": 1,
                                            "numSegmentsProcessed": 1,
                                            "numSegmentsMatched": 1,
                                            "totalDocs": 97889,
                                            "threadCpuTimeNs": 11355810
                                          }
                                        ]
                                      }
                                    ]
                                  }
                                ]
                              }
                            ]
                          }
                        ]
                      }
                    ]
                  },
                  {
                    "type": "MAILBOX_RECEIVE",
                    "executionTimeMs": 611,
                    "emittedRows": 4992339,
                    "fanIn": 3,
                    "inMemoryMessages": 13,
                    "rawMessages": 26,
                    "deserializedBytes": 13542792,
                    "deserializationTimeMs": 2,
                    "upstreamWaitMs": 2667,
                    "children": [
                      {
                        "type": "MAILBOX_SEND",
                        "executionTimeMs": 336,
                        "emittedRows": 97889,
                        "stage": 2,
                        "parallelism": 1,
                        "fanOut": 3,
                        "inMemoryMessages": 10,
                        "rawMessages": 21,
                        "serializedBytes": 286843,
                        "serializationTimeMs": 6,
                        "children": [
                          {
                            "type": "LEAF",
                            "table": "baseballStats_OFFLINE",
                            "executionTimeMs": 78,
                            "emittedRows": 97889,
                            "numDocsScanned": 97889,
                            "numEntriesScannedPostFilter": 97889,
                            "numSegmentsQueried": 1,
                            "numSegmentsProcessed": 1,
                            "numSegmentsMatched": 1,
                            "totalDocs": 97889,
                            "threadCpuTimeNs": 24490,
                            "systemActivitiesCpuTimeNs": 740
                          }
                        ]
                      }
                    ]
                  }
                ]
              }
            ]
          }
        ]
      }
    ]
  }
}

@gortiz gortiz added the multi-stage Related to the multi-stage query engine label Mar 22, 2024
- always send stats through endOfStream blocks
- Aggregate stats on top of a StageStatsHolder
Comment on lines 53 to 54
private static byte[] serializeStats(List<ByteBuffer> stats) {
try (UnsynchronizedByteArrayOutputStream baos = new UnsynchronizedByteArrayOutputStream(1024);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A future PR should change BaseDataBlock to do not ask for byte[] but ByteBuffer so we won't need to do an extra heap copy of the content.

@gortiz gortiz added the backward-incompat Referenced by PRs that introduce or fix backward compat issues label Apr 2, 2024
Comment on lines 389 to 397
if (tableType == TableType.OFFLINE) {
_offlineThreadCpuTimeNs += threadCpuTimeNs;
_offlineSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs;
_offlineResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs;
} else {
_realtimeThreadCpuTimeNs += threadCpuTimeNs;
_realtimeSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs;
_realtimeResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct because tableName never contains the _OFFLINE or _REALTIME suffix, so these stats are always counted as realtime.

Right now as far as I can test, these stats are not being filled in V2. @Jackie-Jiang do you think we should just remove this or always count the times in either offline or realtime as we are doing right now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I guess this is an example where we cannot directly expose V1 stats as V2 stats even for leaf stage. V1 server request only touch one table (either OFFLINE or REALTIME), but V2 leaf stage can touch 1 or 2 tables. To solve this, I think we should read the V1 stats in leaf stage, and convert them into leaf stage stats there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think I misunderstood the issue, and we are already reposting the stats in leaf stage operator. And now I understand better why we need to make V1 stats also implementing StatMap.Key.
I guess we can just leave them as 0. People can lookup the V2 stats to get the details

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not V1 or V2 stats. They broker stats. They are populated in the broker code.

V1 collects threadCpuTimeNs, systemActivitiesCpuTimeNs and responseSerializationCpuTimeNs but doesn't care whether they are realtime or offline. That information reaches the broker and depending on the table type, it stores the value in the realtime or offline version and that is what is returned to the customer.

In the code we have in master right now these stats are not being populated when the query is executed in V2. Specifically in ExecutionStatsAggregator, the code:

    TableType tableType = null;
    String instanceName = null;
    if (routingInstance != null) {
      tableType = routingInstance.getTableType();
      instanceName = routingInstance.getShortName();;
    } else if (tableName != null) {
      tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
      instanceName = tableName;
    } else {
      tableType = null;
      instanceName = null;
    }

Ends up setting tableType to null, these stats are never calculated.

I guess we can remove these lines and keep the current behavior. We can try to add support for this in the future.

@gortiz
Copy link
Contributor Author

gortiz commented Apr 24, 2024

Can you share the race condition you find during debugging? I want to learn the context

If found it in dcb197c, whose tests are still in https://github.com/apache/pinot/actions/runs/8659408355/job/23754450579. But previous commits failed in other cases.

Basically it looks like there is a concurrency issue if we remove the Synchronized map in StatMap. Specifically, I was able to reproduce it by running one of the test that failed 1000 times in a loop. After a couple of minutes it failed. In that case, it always failed in the stats of the leaf operator. In that operator I modify the StatMap in the constructor.

I have to say that in that version of the code the StatMap was initialized in the super class, so maybe the constructor with final attribute was not applying by then. Now that it applies, it may be the case that we don't need to actually synchronize, but I wouldn't feel very comfortable relaying on that.

@Jackie-Jiang
Copy link
Contributor

Regarding the race condition, it might be related to the early termination. I'll also look more into it, but I remember seeing similar issues for other classes

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed most of the high level flows and it looks very promising.
Is it possible to break this PR into smaller ones? Currently it covers too many topics and is hard to track all the changes

return _type;
}

public void updateV1Metadata(StatMap<DataTable.MetadataKey> oldMetadata, StatMap<StatKey> stats) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we maintained a map from v2 key to v1 key, and use v2 stats to fill the v1 stats, then use v1 stats to fill the broker response. IMO this will make future decoupling of v1 and v2 hard.
Does it work if we do not fill the v1 stats, but directly use v2 stats to fill the broker response? I'm not sure if you did this to simplify the broker response V1 handling, but since we already decoupled broker response v1 and v2, should we also decouple this?

The v1 stats key to v2 mapping is done through the switch case in mergeExecutionStats(). If we remove the mapping from v2 to v1, can we make v1 stats not implementing StatMap.Key? The overall goal here is to decouple the v2 and v1 handling so that in the future we can remove v1 easier if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stats returned by BrokerResponseNative are based on the stats in MetadataKey and extend them with some extra stats not present in there. But instead of using the map from MetadataKey to values we already have, BrokerResponseNative stores all of them in specific attributes, which makes BrokerResponseNative a bit difficult to understand, specially to understand which attributes are unique to BrokerResponseNative and which are just a copy of some MetadataKey.

In master, BrokerResponseNativeV2 extends BrokerResponseNative, so it inherits the same pattern. This PR is improving the situation by storing a StatMap with the MetadataKey values. Now it is easy to see which values are just MetadataKey and which ones are actual new concepts added in the broker.

If we remove the mapping from v2 to v1, can we make v1 stats not implementing StatMap.Key?

This is not the first time you ask about that. My answer is: Why is that a target? What is the issue of MetadataKey implementing StatMap.Key? To me it looks like an improvement. Now, for example, MetadataKeys can define how are they merged in a nice way. StatMap doesn't have to be limited to store the V2 stats but any stat like feature in a efficient way.

Does it work if we do not fill the v1 stats, but directly use v2 stats to fill the broker response?

There is no V2 stat that can assume this role. Because BrokerResponse stats are showing an aggregated view of the query stats, while V2 stats are operation base. For example, in a query we may have 2 different aggregations. One of them may have reached its num groups limit and the other may not.

In that case, there would be a single value for BrokerResponse.isNumGroupsLimitReached, which will be true. But from the V2 stats point of view there are two instances of a Map<LeafStageTransferableBlockOperator.StatKey> (or Map<AggregateOperator.StatKey>). One of them will say that the limit has being reached and the other will say the opposite. In order to generate the actual BrokerResponse we need to merge both.

Then we would need to create a new StatKey for BrokerResponse and that new enum would end up being almost exactly equal to MetadataKey.

Could we implement BrokerNativeResponseV2 without using a StatMap? Yes, we could, but then we would need to add a lot of logic. We already have that in ExecutionStatsAggregator.setStats for V1 and we already had that for V2. Now instead of having at least two almost identical large method merging the stats, we have that merge logic defined in the stats, so it is more difficult to mess it up. These large methods are problematic because they are are mostly adding stats, but sometimes the merge logic is different and we need to be aware of that each time we merge the stats. With StatKey we end up having smaller methods which call StatKey.merge which is where the actual logic is defined.

The overall goal here is to decouple the v2 and v1 handling so that in the future we can remove v1 easier if necessary.

This PR doesn't make more difficult to do so, in fact it makes it easier. Apart from collecting stats in V1, the main blocker to remove MetadataKey (the V1 stats) is that BrokerResponse and MetadataKey are very coupled.

The idea of mapping V2 stats into V1 is there just to make it easier to return them in BrokerResponse. If in the future we want to remove MetadataKey, we can just create a new class that implements StatKey and map from V2 to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to explain why I don't like having MetadataKey implementing StatMap.Key. To me MetadataKey is the stats key for v1 engine, and some stats don't apply to v2 engine. Thus I would model StatMap.Key as the stats key for v2 engine, which is logically separated from v1 engine stats. Making MetadataKey implementing StatMap.Key is essentially declaring v1 engine stats part of v2 stats, which is a coupling of v1 and v2.
From implementation perspective, based on my code reading (correct me if I missed some important steps), for multi-stage engine, on the broker side, we only get the stats for leaf stage operator. Then we convert the v2 stats into v1 stats (MultiStageOperator.Type.LEAF.mergeInto()), then finally call BrokerResponseNativeV2.addServerStats() to add v1 stats into v2 response. I don't see other usage of MetadataKey within v2 engine flow, and I don't really see the benefits of this back and forth conversion. If all the v1 stats are from v2 leaf stage stats, we should be able to directly use leaf stage stats to serve the stats within v2 response.
Let's bring this offline to ensure we are on the same page and I'm not missing important point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus I would model StatMap.Key as the stats key for v2 engine, which is logically separated from v1 engine stats.

I think that is a difference we have in our models. In my model StatMap.Key is not related to V2 engine. It is just an efficient map from enums to primitives that we designed to be used in V2, but could be used in other places.

From implementation perspective, based on my code reading (correct me if I missed some important steps), for multi-stage engine, on the broker side, we only get the stats for leaf stage operator.

I think the correct formulation would be for multi-stage engine, on the broker side, we mostly get the stats for leaf stage operator. There are some metrics, like maxRowsInOperator, maxRowsInJoinReached or numGroupsLimitReached that are collected from other multi-stage operators different than leaf stage. For example BrokerNative.numGroupsLimitReached is true if any aggregation in the tree has reached the group limits. In V1 that is the only info we have. In V2 we can iterate over statsMap and get which specific group by is the one that reached the limit.

Anyway, yesterday I modified the code so MetadataKey does not implement StatMap.Key. Instead I created a new enum with broker stats.

case UNKNOWN:
LOGGER.debug("Skipping unknown execution stat: {}", entry.getKey());
break;
case TABLE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need table? Seems we are simply overriding it?
I think we can remove it from v2 stats

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After removing TABLE and OPERATOR_ID, we might be able to remove STRING type. string stats doesn't make much sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this stat is very useful to indicate the table in the json stats returned to the client. Specifically, in the json that correspond to the leaf stage. Right now it is very difficult to map from stages to the original SQL and this is one of the few hints we have to understand how stages are being generated. See for example the stat map in the description of this PR. Without the table attribute it is very difficult to map from these stats to the original SQL and therefore understand how to improve your query.

Said that, we can do that without adding table as an stat. We can add add the table json field on MultiStageBrokerRequestHandler when the json is being created. I've pushed a change that does that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, it doesn't seem easy to add the table when the stat json is being build. Specifically in cases like the one where we have a pipeline breaker (semi join) the logical plan is quite different to the physical plan which makes it very difficult to infer the name of the table from the logical plan.

Even it is a bit hackish, I'm going to keep it the table as a stat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how these 2 stats are used? I don't see them used in the broker response, and seems the merge rule is overriding, meaning if we gathered 2 tables/operatorIds, the second one will override the first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed OPERATOR_ID because there was no use for it.

Right now the TABLE stat is used in MultiStageBrokerRequestHandler to build the stageStats json object (the one that is shown in the description of this PR). There we use the Table stat to introduce the name of the table scanned when the json node of the leaf stage is created. In theory we don't need this here, we can infer the name of the table from the PlanNode, but there are cases where the PlanNode tree is quite different to the actual physical plan, so it is more difficult to do than expected.

There was another place where it was used, in ResourceBasedQueriesTest.testQueryTestCasesWithMetadata, but I ended up finding a way to do not need it. The previous version of the code was honestly simpler, but I guess the new version is better because ideally we would like to remove the TABLE stat.

throws IOException {
switch (version) {
case 1: {
V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still confused. Is this the only place where v1 metadata block is used? Does it handle error differently from v2 metadata block?
Does it work if we ignore the version and always return new MetadataBlock(byteBuffer);?
If the error handling is different, then sending v2 metadata block to old server will cause backward incompatibility right?

Copy link
Contributor Author

@gortiz gortiz Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only place where v1 metadata block is used?

Yes. It is used only here and in the tests.

Does it work if we ignore the version and always return new MetadataBlock(byteBuffer);?

Yes. No, see the next comemnt

If the error handling is different, then sending v2 metadata block to old server will cause backward incompatibility right?

It is no different.

Remember that the main reason to have this class is to be able to run tests with it. The fact that it is useful here is just an extra.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to move V1MetadataBlock to the test classes, but I found some issues and I think the correct thing is to keep them here.

It looks like we cannot always return new Metadata(byteBuffer) because errors are encoded in a different way. Specifically, the metadata type (whether it is a eos or an error) was encoded in the variable part in V1 and in the fixed part in V2.

The change is still forward compatible (V1 can read V2) and with these lines, V2 can also read V1. I've created more test to verify that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we are encoding errors differently in v1 and v2, how does v1 understand the errors in v2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's list down the behavior on reading v1 bytes as v2 and also reading v2 bytes as v1 so that user knows what to expect when upgrading.
With current way of keeping both versions, we don't need to read v1 bytes as v2, but we can still read v2 bytes as v1 block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we are encoding errors differently in v1 and v2, how does v1 understand the errors in v2?

In V1 the quality of being an error is reported in two different ways:

  1. By having a map of exceptions, which is serialized in a specific byte buffer section.
  2. By setting the type in the json Contents stored in the variable size section.

In V2 we just use the first option. I that is what the very old V0 (whose source code is in MetadataBlockTest.V0MetadataBlock) in version was doing. Given V1 can read V0, it works as expected.

Specifically, V1 is able to read bytes where the variable section is empty or Contents.type is not defined because getType is defined as:

  public MetadataBlock.MetadataBlockType getType() {
    String type = _contents.getType();

    // if type is null, then we're reading a legacy block where we didn't encode any
    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
    // otherwise
    return type == null
        ? (getExceptions().isEmpty() ? MetadataBlock.MetadataBlockType.EOS : MetadataBlock.MetadataBlockType.ERROR)
        : MetadataBlock.MetadataBlockType.valueOf(type);
  }

Given we don't set Contents.getType, its value is null and we return EOS or ERROR depending on whether exceptions is empty or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's list down the behavior on reading v1 bytes as v2 and also reading v2 bytes as v1 so that user knows what to expect when upgrading.

The compatibility matrix is verified by tests. I've just renamed the name of the tests and I'm adding here the test table to show which tests verifies each scenario:

input version input type input content possible? read in version output type output content tested by
v1 eos empty yes v2 eos empty v1EosWithStatsIsDecodedAsV2EosWithoutStats
v1 eos not empty yes v2 eos empty v1EosWithoutStatsIsDecodedAsV2EosWithoutStats
v1 error empty no - - - -
v1 error not empty yes v2 error same as input v1ErrorIsDecodedAsV2ErrorWithSameExceptions
v2 eos empty yes v1 eos empty v2EosWithoutStatsIsReadInV1AsEosWithoutStats
v2 eos not empty yes v1 eos empty v2EosWithStatsIsReadInV1AsEosWithoutStats
v2 error empty no - - - -
v2 error not empty yes v1 error same as input v2ErrorIsReadInV1AsErrorWithSameExceptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above chart, do we need to have the special handling here?
This is a non-blocking comment, we may iterate on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove this case, then v1ErrorWithExceptionsIsDecodedAsV2ErrorWithSameExceptions fails because we decode as EOS instead of error.

The reason for that is because when I modified MetadataBlock I decided to keep the _type attribute. But I guess it doesn't make much sense if we define that an error block must always contain at least one exception (like in V0). I've just changed that and now all tests are passing and V1MetadataBlock can be moved to the test directory.

Anyway, one of my plans is to clean this code a lot in order to improve serialization/deserialization times.

@gortiz
Copy link
Contributor Author

gortiz commented Apr 29, 2024

Is it possible to break this PR into smaller ones? Currently it covers too many topics and is hard to track all the changes

I can try, but I think most of the topics are related.

@gortiz gortiz mentioned this pull request Apr 30, 2024
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
We can iterate on the following 2 things separately:

  1. Race condition in StatsMap
  2. Whether we need to keep V1 metadata block in production code

throws IOException {
switch (version) {
case 1: {
V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above chart, do we need to have the special handling here?
This is a non-blocking comment, we may iterate on this

@gortiz gortiz merged commit fb9e6c5 into apache:master May 3, 2024
20 checks passed
@gortiz gortiz deleted the multi-stage-stats branch May 3, 2024 13:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature multi-stage Related to the multi-stage query engine observability
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants