-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test(substrait): update TPCH tests #12462
Conversation
Substrait plans can contains relations within expressions, for example in Subqueries. As such, in order to fully collect schemas we must traverse through relation and expressions.
When subseconds is not set, allow for missing precision_mode
"tests/testdata/tpch/lineitem.csv", | ||
)]) | ||
.await?; | ||
let path = "tests/testdata/tpch_substrait_plans/query_1.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed a lot of duplication so I encapsulated the test setup code in tpch_plan_to_string
.
let proto = serde_json::from_reader::<_, Plan>(BufReader::new( | ||
File::open(path).expect("file not found"), | ||
)) | ||
.expect("failed to parse json"); | ||
|
||
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of reading the CSV input files to generate schemas, we can generate them directly from the Substrait plans.
if existing_table.schema() != schema { | ||
return substrait_err!( | ||
"Substrait plan contained the same table {} with different schemas.\nSchema 1: {}\nSchema 2: {}", | ||
table_reference, existing_table.schema(), schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm checking for the case of the same table having different schemas because I've been bitten by it before due to schema pruning shenanigans and it took a while to figure out what was hapenning.
fn collect_schemas(plan: &Plan) -> Vec<(TableReference, Arc<dyn TableProvider>)> { | ||
fn collect_schemas( | ||
plan: &Plan, | ||
) -> Result<Vec<(TableReference, Arc<dyn TableProvider>)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched this to be Result based because eventually I would like this to be part of the core library. Collecting schemas like this is super useful.
RelType::Fetch(f) => self.apply(f.input.as_ref().map(|b| b.as_ref())), | ||
RelType::Aggregate(a) => self.apply(a.input.as_ref().map(|b| b.as_ref())), | ||
RelType::Sort(s) => self.apply(s.input.as_ref().map(|b| b.as_ref())), | ||
fn collect_schemas_from_rel(&mut self, rel: &Rel) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meta: writing traversals like this is quite painful. It would be good go get some visitation machinery added to substrait-rust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if https://docs.rs/datafusion/latest/datafusion/common/tree_node/trait.TreeNode.html could serve as inspiration. @peter-toth put a lot of effort into improving that API and I think i is pretty neat now
.subquery_type | ||
.as_ref() | ||
.ok_or(substrait_datafusion_err!("subquery_type must be set"))?; | ||
match subquery_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A number of the TPCH plans include Subquery expressions. Without this traversal, the schemas generated from the plans were missing tables.
\n Aggregate: groupBy=[[]], aggr=[[min(FILENAME_PLACEHOLDER_5.ps_supplycost)]]\ | ||
\n Projection: FILENAME_PLACEHOLDER_5.ps_supplycost\ | ||
\n Filter: FILENAME_PLACEHOLDER_5.ps_partkey = FILENAME_PLACEHOLDER_5.ps_partkey AND FILENAME_PLACEHOLDER_6.s_suppkey = FILENAME_PLACEHOLDER_5.ps_suppkey AND FILENAME_PLACEHOLDER_6.s_nationkey = FILENAME_PLACEHOLDER_7.N_NATIONKEY AND FILENAME_PLACEHOLDER_7.N_REGIONKEY = FILENAME_PLACEHOLDER_8.R_REGIONKEY AND FILENAME_PLACEHOLDER_8.R_NAME = CAST(Utf8(\"EUROPE\") AS Utf8)\ | ||
\n Inner Join: Filter: Boolean(true)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the Inner Join with an always true filter has been replaced by an explicit CrossJoin, which is equivalent.
"input": { | ||
"project": { | ||
"common": { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly copied plans set the remap field correctly. Their absence here didn't matter because DataFusion ignored them, but once I started adding remap support these plans broke because they were incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @vbarua -- this PR looks like a significant improvement of the substrait functionality to me. Also thank you for your attention to detail
cc @Blizzara @Lordworms and @waynexia who may also be interested in this PR
I suggest we file another ticket and list the TPCH queries that don't currently run to make the current state of substrait more visible
\n Projection: FILENAME_PLACEHOLDER_0.l_returnflag, FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity, FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount), FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) + FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\ | ||
\n Filter: FILENAME_PLACEHOLDER_0.l_shipdate <= Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120, milliseconds: 0 }\")\ | ||
\n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]" | ||
"Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE, avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those certainly look nicer
\n TableScan: NATION projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]\ | ||
\n TableScan: REGION projection=[R_REGIONKEY, R_NAME, R_COMMENT]"); | ||
#[ignore] | ||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for other reviewers, these plans weren't included in the initial coverage either
\n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\ | ||
\n TableScan: FILENAME_PLACEHOLDER_1 projection=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment]"); | ||
#[ignore] | ||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should be tracking in a ticket somewhere which TPCH plans aren't currently handled by the substrait implementation 🤔
RelType::Fetch(f) => self.apply(f.input.as_ref().map(|b| b.as_ref())), | ||
RelType::Aggregate(a) => self.apply(a.input.as_ref().map(|b| b.as_ref())), | ||
RelType::Sort(s) => self.apply(s.input.as_ref().map(|b| b.as_ref())), | ||
fn collect_schemas_from_rel(&mut self, rel: &Rel) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if https://docs.rs/datafusion/latest/datafusion/common/tree_node/trait.TreeNode.html could serve as inspiration. @peter-toth put a lot of effort into improving that API and I think i is pretty neat now
LGTM, thanks! |
I've created the following to track the issues with the TPCH queries: |
Which issue does this PR close?
Part of fixing/implementing #12347
Rationale for this change
The work for #12347 caused Substrait TPCH test failures. Investigations showed that the vendored plans being used for them were quite old, and in fact wrong in some cases. As a result, the tests have been updated in the upstream in substrait-io/consumer-testing#105
I've chosen to update these tests as their own PR to make it easier to review the follow up change for #12347.
What changes are included in this PR?
Functional Changes
precision_mode
is not set.Test Changes
Are these changes tested?
The changes are primarily to tests. Changes were made to allow the tests to continue running.
Are there any user-facing changes?