Skip to content

Commit

Permalink
[substrait] Add support for ExtensionTable
Browse files Browse the repository at this point in the history
  • Loading branch information
ccciudatu committed Jan 8, 2025
1 parent 81b50c4 commit 20ba857
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 45 deletions.
24 changes: 2 additions & 22 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use datafusion_expr::{
expr_rewriter::FunctionRewrite,
logical_plan::{DdlStatement, Statement},
planner::ExprPlanner,
Expr, UserDefinedLogicalNode, WindowUDF,
Expr, WindowUDF,
};

// backwards compatibility
Expand Down Expand Up @@ -1682,27 +1682,7 @@ pub enum RegisterFunction {
#[derive(Debug)]
pub struct EmptySerializerRegistry;

impl SerializerRegistry for EmptySerializerRegistry {
fn serialize_logical_plan(
&self,
node: &dyn UserDefinedLogicalNode,
) -> Result<Vec<u8>> {
not_impl_err!(
"Serializing user defined logical plan node `{}` is not supported",
node.name()
)
}

fn deserialize_logical_plan(
&self,
name: &str,
_bytes: &[u8],
) -> Result<Arc<dyn UserDefinedLogicalNode>> {
not_impl_err!(
"Deserializing user defined logical plan node `{name}` is not supported"
)
}
}
impl SerializerRegistry for EmptySerializerRegistry {}

/// Describes which SQL statements can be run.
///
Expand Down
40 changes: 35 additions & 5 deletions datafusion/expr/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::expr_rewriter::FunctionRewrite;
use crate::planner::ExprPlanner;
use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
use crate::{AggregateUDF, ScalarUDF, TableSource, UserDefinedLogicalNode, WindowUDF};
use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
use std::collections::HashSet;
use std::fmt::Debug;
Expand Down Expand Up @@ -123,22 +123,52 @@ pub trait FunctionRegistry {
}
}

/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode]
/// and custom table providers for which the name alone is meaningless in the target
/// execution context, e.g. UDTFs, manually registered tables etc.
pub trait SerializerRegistry: Debug + Send + Sync {
/// Serialize this node to a byte array. This serialization should not include
/// input plans.
fn serialize_logical_plan(
&self,
node: &dyn UserDefinedLogicalNode,
) -> Result<Vec<u8>>;
) -> Result<Vec<u8>> {
not_impl_err!(
"Serializing user defined logical plan node `{}` is not supported",
node.name()
)
}

/// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
/// bytes.
fn deserialize_logical_plan(
&self,
name: &str,
bytes: &[u8],
) -> Result<Arc<dyn UserDefinedLogicalNode>>;
_bytes: &[u8],
) -> Result<Arc<dyn UserDefinedLogicalNode>> {
not_impl_err!(
"Deserializing user defined logical plan node `{name}` is not supported"
)
}

/// Serialized table definition for UDTFs or manually registered table providers that can't be
/// marshaled by reference. Should return some benign error for regular tables that can be
/// found/restored by name in the destination execution context.
fn serialize_custom_table(&self, _table: &dyn TableSource) -> Result<Vec<u8>> {
not_impl_err!("No custom table support")
}

/// Deserialize the custom table with the given name.
/// Note: more often than not, the name can't be used as a discriminator if multiple different
/// `TableSource` and/or `TableProvider` implementations are expected (this is particularly true
/// for UDTFs in DataFusion, which are always registered under the same name: `tmp_table`).
fn deserialize_custom_table(
&self,
name: &str,
_bytes: &[u8],
) -> Result<Arc<dyn TableSource>> {
not_impl_err!("Deserializing custom table `{name}` is not supported")
}
}

/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s
Expand Down
52 changes: 49 additions & 3 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::logical_expr::expr::{Exists, InSubquery, Sort};

use datafusion::logical_expr::{
Aggregate, BinaryExpr, Case, Cast, EmptyRelation, Expr, ExprSchemable, Extension,
LogicalPlan, Operator, Projection, SortExpr, Subquery, TryCast, Values,
LogicalPlan, Operator, Projection, SortExpr, Subquery, TableScan, TryCast, Values,
};
use substrait::proto::aggregate_rel::Grouping;
use substrait::proto::expression as substrait_expression;
Expand Down Expand Up @@ -86,6 +86,7 @@ use substrait::proto::expression::{
SingularOrList, SwitchExpression, WindowFunction,
};
use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile;
use substrait::proto::read_rel::ExtensionTable;
use substrait::proto::rel_common::{Emit, EmitKind};
use substrait::proto::set_rel::SetOp;
use substrait::proto::{
Expand Down Expand Up @@ -457,6 +458,22 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
user_defined_literal.type_reference
)
}

fn consume_extension_table(
&self,
extension_table: &ExtensionTable,
_schema: &DFSchema,
_projection: &Option<MaskExpression>,
) -> Result<LogicalPlan> {
if let Some(ext_detail) = extension_table.detail.as_ref() {
substrait_err!(
"Missing handler for extension table: {}",
&ext_detail.type_url
)
} else {
substrait_err!("Unexpected empty detail in ExtensionTable")
}
}
}

/// Convert Substrait Rel to DataFusion DataFrame
Expand Down Expand Up @@ -578,6 +595,32 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
let plan = plan.with_exprs_and_inputs(plan.expressions(), inputs)?;
Ok(LogicalPlan::Extension(Extension { node: plan }))
}

fn consume_extension_table(
&self,
extension_table: &ExtensionTable,
schema: &DFSchema,
projection: &Option<MaskExpression>,
) -> Result<LogicalPlan> {
if let Some(ext_detail) = &extension_table.detail {
let source = self
.state
.serializer_registry()
.deserialize_custom_table(&ext_detail.type_url, &ext_detail.value)?;
let table_name = ext_detail
.type_url
.rsplit_once('/')
.map(|(_, name)| name)
.unwrap_or(&ext_detail.type_url);
let table_scan = TableScan::try_new(table_name, source, None, vec![], None)?;
let plan = LogicalPlan::TableScan(table_scan);
ensure_schema_compatibility(plan.schema(), schema.clone())?;
let schema = apply_masking(schema.clone(), projection)?;
apply_projection(plan, schema)
} else {
substrait_err!("Unexpected empty detail in ExtensionTable")
}
}
}

// Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which
Expand Down Expand Up @@ -1467,8 +1510,11 @@ pub async fn from_read_rel(
)
.await
}
_ => {
not_impl_err!("Unsupported ReadType: {:?}", read.read_type)
Some(ReadType::ExtensionTable(ext)) => {
consumer.consume_extension_table(ext, &substrait_schema, &read.projection)
}
None => {
substrait_err!("Unexpected empty read_type")
}
}
}
Expand Down
Loading

0 comments on commit 20ba857

Please sign in to comment.