Skip to content

Commit

Permalink
datafusion: Create table provider for a snapshot. (#707)
Browse files Browse the repository at this point in the history
* datafusion: Create table provider for a snapshot.

The Iceberg table provider allows querying an Iceberg table via
datafusion. The initial implementation only allowed querying the latest
snapshot of the table. It sometimes useful to query a specific snapshot
(time travel). This commit adds this capability.  It adds a new method
(`try_new_from_table_snapshot`) that creates a provider for a specific
table snapshot.

All existing APIs should work as before.

Signed-off-by: Leonid Ryzhyk <[email protected]>

* datafusion: use Snapshot::schema, not schema_id().

Apply @liurenjie1024's suggestion: use `Snapshot::schema` instead of retrieving
the schema directly by id (which can be missing in the snapshot).

Signed-off-by: Leonid Ryzhyk <[email protected]>

---------

Signed-off-by: Leonid Ryzhyk <[email protected]>
Co-authored-by: Leonid Ryzhyk <[email protected]>
  • Loading branch information
ryzhyk and Leonid Ryzhyk authored Nov 20, 2024
1 parent b2fb803 commit 697a200
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 6 deletions.
15 changes: 13 additions & 2 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use crate::to_datafusion_error;
pub(crate) struct IcebergTableScan {
/// A table in the catalog.
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
/// Stores certain, often expensive to compute,
Expand All @@ -58,6 +60,7 @@ impl IcebergTableScan {
/// Creates a new [`IcebergTableScan`] object.
pub(crate) fn new(
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
Expand All @@ -68,6 +71,7 @@ impl IcebergTableScan {

Self {
table,
snapshot_id,
schema,
plan_properties,
projection,
Expand Down Expand Up @@ -119,6 +123,7 @@ impl ExecutionPlan for IcebergTableScan {
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
self.table.clone(),
self.snapshot_id,
self.projection.clone(),
self.predicates.clone(),
);
Expand Down Expand Up @@ -157,12 +162,18 @@ impl DisplayAs for IcebergTableScan {
/// and then converts it into a stream of Arrow [`RecordBatch`]es.
async fn get_batch_stream(
table: Table,
snapshot_id: Option<i64>,
column_names: Option<Vec<String>>,
predicates: Option<Predicate>,
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
let scan_builder = match snapshot_id {
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
None => table.scan(),
};

let mut scan_builder = match column_names {
Some(column_names) => table.scan().select(column_names),
None => table.scan().select_all(),
Some(column_names) => scan_builder.select(column_names),
None => scan_builder.select_all(),
};
if let Some(pred) = predicates {
scan_builder = scan_builder.with_filter(pred);
Expand Down
73 changes: 69 additions & 4 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::table::Table;
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};

use crate::physical_plan::scan::IcebergTableScan;

Expand All @@ -39,13 +39,19 @@ use crate::physical_plan::scan::IcebergTableScan;
pub struct IcebergTableProvider {
/// A table in the catalog.
table: Table,
/// Table snapshot id that will be queried via this provider.
snapshot_id: Option<i64>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
}

impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
IcebergTableProvider { table, schema }
IcebergTableProvider {
table,
snapshot_id: None,
schema,
}
}
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given client and table name to fetch an actual [`Table`]
Expand All @@ -60,14 +66,46 @@ impl IcebergTableProvider {

let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

Ok(IcebergTableProvider { table, schema })
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table(table: Table) -> Result<Self> {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider { table, schema })
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
let snapshot = table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!(
"snapshot id {snapshot_id} not found in table {}",
table.identifier().name()
),
)
})?;
let schema = snapshot.schema(table.metadata())?;
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
Ok(IcebergTableProvider {
table,
snapshot_id: Some(snapshot_id),
schema,
})
}
}

Expand All @@ -94,6 +132,7 @@ impl TableProvider for IcebergTableProvider {
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(IcebergTableScan::new(
self.table.clone(),
self.snapshot_id,
self.schema.clone(),
projection,
filters,
Expand Down Expand Up @@ -162,4 +201,30 @@ mod tests {
let has_column = df_schema.has_column(&Column::from_name("z"));
assert!(has_column);
}

#[tokio::test]
async fn test_try_new_from_table_snapshot() {
let table = get_test_table_from_metadata_file().await;
let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
let table_provider =
IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("mytable", Arc::new(table_provider))
.unwrap();
let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
let df_schema = df.schema();
let df_columns = df_schema.fields();
assert_eq!(df_columns.len(), 3);
let x_column = df_columns.first().unwrap();
let column_data = format!(
"{:?}:{:?}",
x_column.name(),
x_column.data_type().to_string()
);
assert_eq!(column_data, "\"x\":\"Int64\"");
let has_column = df_schema.has_column(&Column::from_name("z"));
assert!(has_column);
}
}

0 comments on commit 697a200

Please sign in to comment.