Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression "RowConverter column schema mismatch, expected Utf8 got Dictionary(Int32, Utf8)" after upgrade #8738

Closed
alamb opened this issue Jan 3, 2024 · 9 comments · Fixed by #8740
Assignees
Labels
bug Something isn't working

Comments

@alamb
Copy link
Contributor

alamb commented Jan 3, 2024

Describe the bug

@appletreeisyellow is updating DataFusion in InfluxDB and after upgrade some of our queries hit the following error:

RowConverter column schema mismatch, expected Utf8 got Dictionary(Int32, Utf8)

To Reproduce

Our current reproducer has customer data that I can't share, but I will create a test case that shows the problem that we can share later.

 datafusion-cli  -f smaller-df.sql
datafusion-cli  -f smaller-df.sql
DataFusion CLI v34.0.0
External error: Arrow error: Invalid argument error: RowConverter column schema mismatch, expected Utf8 got Dictionary(Int32, Utf8)

Expected behavior

The queries should pass

Additional context

I confirmed that when I reverted eb8aff7 / #8291 the problem stopped (see #8740 for the specific change)

@alamb alamb added the bug Something isn't working label Jan 3, 2024
@alamb alamb self-assigned this Jan 3, 2024
@qrilka
Copy link
Contributor

qrilka commented Jan 3, 2024

I'd like to take a look into a repro but will not have time for it earlier than this weekend

@alamb
Copy link
Contributor Author

alamb commented Jan 3, 2024

I'd like to take a look into a repro but will not have time for it earlier than this weekend

Thanks @qrilka -- I am working on a reproducer and I may try and work on it before this weekend as it is blocking our upgrade in IOx. I'll ping you on any PR I may propose

alamb added a commit to alamb/datafusion that referenced this issue Jan 4, 2024
@alamb
Copy link
Contributor Author

alamb commented Jan 4, 2024

Reproducer is in #8750

@alamb
Copy link
Contributor Author

alamb commented Jan 5, 2024

Ok, I have figured out what is going on here

The original plan is here

Details

| physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST,tag_id@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |   SortExec: expr=[time@0 ASC NULLS LAST,tag_id@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |     ProjectionExec: expr=[timestamp@0 as time, tag_id@1 as tag_id, field@2 as field, value@3 as value]                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |       AggregateExec: mode=FinalPartitioned, gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[]                                                                                                                                                                                                                                                                                                                                                                               |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |           RepartitionExec: partitioning=Hash([timestamp@0, tag_id@1, field@2, value@3], 16), input_partitions=32                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |             AggregateExec: mode=Partial, gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[] <****** This is the aggregate that is hitting the problem                                                                                                                                                                                                                                                          |
|               |               UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |                 ProjectionExec: expr=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, CAST(value@3 AS Utf8) as value]                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                   AggregateExec: mode=FinalPartitioned, gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[]                                                                                                                                                                                                                                                                                                                                                                   |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |                       RepartitionExec: partitioning=Hash([timestamp@0, tag_id@1, field@2, value@3], 16), input_partitions=32                                                                                                                                                                                                                                                                                                                                                                                              |
|               |                         AggregateExec: mode=Partial, gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[]                                                                                                                                                                                                                                                                                                                                                                      |
|               |                           UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |                             ProjectionExec: expr=[time@2 as timestamp, tag_id@0 as tag_id, active_power as field, f5@1 as value]                                                                                                                                                                                                                                                                                                                                                                                          |
|               |                               ProjectionExec: expr=[CAST(column2@1 AS Dictionary(Int32, Utf8)) as tag_id, CAST(column3@2 AS Float64) as f5, CAST(column4@3 AS Timestamp(Nanosecond, None)) as time]                                                                                                                                                                                                                                                                                                                       |
|               |                                 RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |                                     FilterExec: column4@3 >= 1701783995000000000 AND column4@3 < 1704289595000000000 AND CAST(column3@2 AS Float64) IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = active AND CAST(column2@1 AS Dictionary(Int32, Utf8)) = 1000                                                                                                                                                                                                                                             |
|               |                                       ValuesExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |                             ProjectionExec: expr=[time@2 as timestamp, tag_id@0 as tag_id, f1 as field, f1@1 as value]                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |                               ProjectionExec: expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as tag_id, CAST(column2@1 AS Float64) as f1, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]                                                                                                                                                                                                                                                                                                                       |
|               |                                 RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |                                     FilterExec: column6@2 >= 1701783995000000000 AND column6@2 < 1704289595000000000 AND CAST(column2@1 AS Float64) IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000                                                                                                                                                                                                                                                                                                     |
|               |                                       ProjectionExec: expr=[column1@0 as column1, column2@1 as column2, column6@5 as column6]                                                                                                                                                                                                                                                                                                                                                                                             |
|               |                                         ValuesExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |                 ProjectionExec: expr=[time@2 as timestamp, tag_id@0 as tag_id, f2 as field, f2@1 as value]                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |                   ProjectionExec: expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as tag_id, column3@1 as f2, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]                                                                                                                                                                                                                                                                                                                                                    |
|               |                     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                         FilterExec: column6@2 >= 1701783995000000000 AND column6@2 < 1704289595000000000 AND column3@1 IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000                                                                                                                                                                                                                                                                                                                                  |
|               |                           ProjectionExec: expr=[column1@0 as column1, column3@2 as column3, column6@5 as column6]                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                             ValuesExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.116 seconds.

The AggregateExec that is having the issue is reading from a UnionExec that looks like this

UnionExec
  ProjectionExec: expr=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, CAST(value@3 AS Utf8) as value]
    AggregateExec: mode=Final, gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[]
      CoalescePartitionsExec
       ... <snip> ...
  ProjectionExec: expr=[time@2 as timestamp, tag_id@0 as tag_id, f2 as field, f2@1 as value]
    ProjectionExec: expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as tag_id, column3@1 as f2, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: column6@2 >= 1701783995000000000 AND column6@2 < 1704289595000000000 AND column3@1 IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000
          ProjectionExec: expr=[column1@0 as column1, column3@2 as column3, column6@5 as column6]
            ValuesExec

The problem here is that after #8291 the inputs to the UnionExec produce different schemas.

  • The first input has an AggregateExec now produces a tag_id column that is Utf8 (as that is the materialized version of the code)
  • The second input has no AggregateExec and passes through the tag_id column that is still Dictionary(Int32, Utf8) encoded.

So the UnionExec cod reports that it will produce tag_id as a Utf8 column but some of the batches contain Utf8 and some are Dictionary(Int32, Utf8) which is what causes the runtime error

@alamb
Copy link
Contributor Author

alamb commented Jan 5, 2024

I am not quite sure what the right fix is yet. Some possibilities:

  1. revert Materialize dictionaries in group keys #8291
  2. Add a cast as part of the inputs to the UnionExec to get the schemas back together

At the very least, I think the UnionExec should generate a runtime (internal error) when it detects incompatible inputs so it is easier to track down problems like this in the future

@tustvold
Copy link
Contributor

tustvold commented Jan 5, 2024

I would have perhaps expected the type coercion machinery to handle this, i.e. option 2 makes sense to me

@alamb
Copy link
Contributor Author

alamb commented Jan 5, 2024

I have spent some more time studying the code.

There is coercion logic to handle Union inputs to they have the correct types (e.g. adding casts) but this happens at the LogicalPlan level, not the ExecutionPlan level. By the time the ExecutionPlan is made, the types are expected to be the same

One potential source of problems is that LogicalPlan::Aggregate no longer accurately reflects the output type that the ExecutionPlan generates (aka planning a LogicalPlan::Aggregate can now result in an ExecutionPlan with a different schema)

I started trying locally to make LogicalPlan::Aggregate reflect the fact that group exprs are dictionary uncompressed but hit a few potential problems:

  1. Some optimizer passes now error
  2. I need to verify that all potential ExecutonPlan nodes for a LogicalPlan::Aggregate do the same decompression of Dictionaries on groupign

@alamb
Copy link
Contributor Author

alamb commented Jan 5, 2024

So I have confirmed that when I change the LogicalPlans to match what the ExecutionPlan now does the test passes again. I will try and make a PR to see how it looks

@alamb
Copy link
Contributor Author

alamb commented Jan 5, 2024

I thought about this more and I think we should go with backing out this change while we sort out how to handle it better for reasons described on #7647 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants