From 697a20060f2247da87f73073e8bf5ab407bd40ea Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Wed, 20 Nov 2024 01:58:50 -0800 Subject: [PATCH] datafusion: Create table provider for a snapshot. (#707) * datafusion: Create table provider for a snapshot. The Iceberg table provider allows querying an Iceberg table via datafusion. The initial implementation only allowed querying the latest snapshot of the table. It sometimes useful to query a specific snapshot (time travel). This commit adds this capability. It adds a new method (`try_new_from_table_snapshot`) that creates a provider for a specific table snapshot. All existing APIs should work as before. Signed-off-by: Leonid Ryzhyk * datafusion: use Snapshot::schema, not schema_id(). Apply @liurenjie1024's suggestion: use `Snapshot::schema` instead of retrieving the schema directly by id (which can be missing in the snapshot). Signed-off-by: Leonid Ryzhyk --------- Signed-off-by: Leonid Ryzhyk Co-authored-by: Leonid Ryzhyk --- .../datafusion/src/physical_plan/scan.rs | 15 +++- .../integrations/datafusion/src/table/mod.rs | 73 ++++++++++++++++++- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 59cf09976..e7a63bc56 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -43,6 +43,8 @@ use crate::to_datafusion_error; pub(crate) struct IcebergTableScan { /// A table in the catalog. table: Table, + /// Snapshot of the table to scan. + snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, /// Stores certain, often expensive to compute, @@ -58,6 +60,7 @@ impl IcebergTableScan { /// Creates a new [`IcebergTableScan`] object. pub(crate) fn new( table: Table, + snapshot_id: Option, schema: ArrowSchemaRef, projection: Option<&Vec>, filters: &[Expr], @@ -68,6 +71,7 @@ impl IcebergTableScan { Self { table, + snapshot_id, schema, plan_properties, projection, @@ -119,6 +123,7 @@ impl ExecutionPlan for IcebergTableScan { ) -> DFResult { let fut = get_batch_stream( self.table.clone(), + self.snapshot_id, self.projection.clone(), self.predicates.clone(), ); @@ -157,12 +162,18 @@ impl DisplayAs for IcebergTableScan { /// and then converts it into a stream of Arrow [`RecordBatch`]es. async fn get_batch_stream( table: Table, + snapshot_id: Option, column_names: Option>, predicates: Option, ) -> DFResult> + Send>>> { + let scan_builder = match snapshot_id { + Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), + None => table.scan(), + }; + let mut scan_builder = match column_names { - Some(column_names) => table.scan().select(column_names), - None => table.scan().select_all(), + Some(column_names) => scan_builder.select(column_names), + None => scan_builder.select_all(), }; if let Some(pred) = predicates { scan_builder = scan_builder.with_filter(pred); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index f0f6514c6..a78e3728c 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -29,7 +29,7 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; -use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; @@ -39,13 +39,19 @@ use crate::physical_plan::scan::IcebergTableScan; pub struct IcebergTableProvider { /// A table in the catalog. table: Table, + /// Table snapshot id that will be queried via this provider. + snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, } impl IcebergTableProvider { pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { - IcebergTableProvider { table, schema } + IcebergTableProvider { + table, + snapshot_id: None, + schema, + } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] @@ -60,14 +66,46 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { table, schema }) + Ok(IcebergTableProvider { + table, + snapshot_id: None, + schema, + }) } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table(table: Table) -> Result { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { table, schema }) + Ok(IcebergTableProvider { + table, + snapshot_id: None, + schema, + }) + } + + /// Asynchronously tries to construct a new [`IcebergTableProvider`] + /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. + pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result { + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "snapshot id {snapshot_id} not found in table {}", + table.identifier().name() + ), + ) + })?; + let schema = snapshot.schema(table.metadata())?; + let schema = Arc::new(schema_to_arrow_schema(&schema)?); + Ok(IcebergTableProvider { + table, + snapshot_id: Some(snapshot_id), + schema, + }) } } @@ -94,6 +132,7 @@ impl TableProvider for IcebergTableProvider { ) -> DFResult> { Ok(Arc::new(IcebergTableScan::new( self.table.clone(), + self.snapshot_id, self.schema.clone(), projection, filters, @@ -162,4 +201,30 @@ mod tests { let has_column = df_schema.has_column(&Column::from_name("z")); assert!(has_column); } + + #[tokio::test] + async fn test_try_new_from_table_snapshot() { + let table = get_test_table_from_metadata_file().await; + let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id(); + let table_provider = + IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id) + .await + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("mytable", Arc::new(table_provider)) + .unwrap(); + let df = ctx.sql("SELECT * FROM mytable").await.unwrap(); + let df_schema = df.schema(); + let df_columns = df_schema.fields(); + assert_eq!(df_columns.len(), 3); + let x_column = df_columns.first().unwrap(); + let column_data = format!( + "{:?}:{:?}", + x_column.name(), + x_column.data_type().to_string() + ); + assert_eq!(column_data, "\"x\":\"Int64\""); + let has_column = df_schema.has_column(&Column::from_name("z")); + assert!(has_column); + } }