From 07a000c389e02fa1e8141941bbc4162110fc5619 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 27 Sep 2023 20:24:37 +0800 Subject: [PATCH 1/2] fix: substrait limit when fetch is None Signed-off-by: Ruihang Xia --- datafusion/core/src/physical_plan/limit.rs | 2 +- datafusion/substrait/src/logical_plan/consumer.rs | 8 ++++++-- datafusion/substrait/src/logical_plan/producer.rs | 2 +- .../substrait/tests/cases/roundtrip_logical_plan.rs | 5 +++++ 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 87a07f8d46fe..1feebac55d4c 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -434,7 +434,7 @@ impl LimitStream { match &poll { Poll::Ready(Some(Ok(batch))) => { - if batch.num_rows() > 0 && self.skip == 0 { + if batch.num_rows() > 0 { break poll; } else { // continue to poll input stream diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 32b8f8ea547f..af58d931b451 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -227,8 +227,12 @@ pub async fn from_substrait_rel( from_substrait_rel(ctx, input, extensions).await?, ); let offset = fetch.offset as usize; - let count = fetch.count as usize; - input.limit(offset, Some(count))?.build() + let count = if fetch.count as usize == usize::MAX { + None + } else { + Some(fetch.count as usize) + }; + input.limit(offset, count)?.build() } else { not_impl_err!("Fetch without an input is not valid") } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index e17b022f3b53..363b346f368d 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -193,7 +193,7 @@ pub fn to_substrait_rel( } LogicalPlan::Limit(limit) => { let input = to_substrait_rel(limit.input.as_ref(), ctx, extension_info)?; - let limit_fetch = limit.fetch.unwrap_or(0); + let limit_fetch = limit.fetch.unwrap_or(usize::MAX); Ok(Box::new(Rel { rel_type: Some(RelType::Fetch(Box::new(FetchRel { common: None, diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index f4d74ae42681..2554d0667e48 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -188,6 +188,11 @@ async fn select_with_limit() -> Result<()> { roundtrip_fill_na("SELECT * FROM data LIMIT 100").await } +#[tokio::test] +async fn select_without_limit() -> Result<()> { + roundtrip_fill_na("SELECT * FROM data OFFSET 10").await +} + #[tokio::test] async fn select_with_limit_offset() -> Result<()> { roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await From 849f4668c6e9f0a0b382d738fdd9b12fd85e6dd8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 28 Sep 2023 13:17:25 -0400 Subject: [PATCH 2/2] Add comments --- datafusion/substrait/src/logical_plan/consumer.rs | 1 + datafusion/substrait/src/logical_plan/producer.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index af58d931b451..e1dde39427a5 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -227,6 +227,7 @@ pub async fn from_substrait_rel( from_substrait_rel(ctx, input, extensions).await?, ); let offset = fetch.offset as usize; + // Since protobuf can't directly distinguish `None` vs `0` `None` is encoded as `MAX` let count = if fetch.count as usize == usize::MAX { None } else { diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 363b346f368d..1124ea53a557 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -193,6 +193,7 @@ pub fn to_substrait_rel( } LogicalPlan::Limit(limit) => { let input = to_substrait_rel(limit.input.as_ref(), ctx, extension_info)?; + // Since protobuf can't directly distinguish `None` vs `0` encode `None` as `MAX` let limit_fetch = limit.fetch.unwrap_or(usize::MAX); Ok(Box::new(Rel { rel_type: Some(RelType::Fetch(Box::new(FetchRel {