From 30ec7d4da4614f77cabb73fdeece1bec9e201101 Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Tue, 19 Nov 2024 22:34:21 -0800 Subject: [PATCH] [FEAT] connect: add drop support --- .../src/translation/logical_plan.rs | 6 ++- .../src/translation/logical_plan/drop.rs | 39 +++++++++++++++++++ tests/connect/test_drop.py | 17 ++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 src/daft-connect/src/translation/logical_plan/drop.rs create mode 100644 tests/connect/test_drop.py diff --git a/src/daft-connect/src/translation/logical_plan.rs b/src/daft-connect/src/translation/logical_plan.rs index 93c9e9bd4a..938bfa5714 100644 --- a/src/daft-connect/src/translation/logical_plan.rs +++ b/src/daft-connect/src/translation/logical_plan.rs @@ -3,9 +3,12 @@ use eyre::{bail, Context}; use spark_connect::{relation::RelType, Limit, Relation}; use tracing::warn; -use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range}; +use crate::translation::logical_plan::{ + aggregate::aggregate, drop::drop, project::project, range::range, +}; mod aggregate; +mod drop; mod project; mod range; @@ -25,6 +28,7 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result { RelType::Aggregate(a) => { aggregate(*a).wrap_err("Failed to apply aggregate to logical plan") } + RelType::Drop(d) => drop(*d).wrap_err("Failed to apply drop to logical plan"), plan => bail!("Unsupported relation type: {plan:?}"), } } diff --git a/src/daft-connect/src/translation/logical_plan/drop.rs b/src/daft-connect/src/translation/logical_plan/drop.rs new file mode 100644 index 0000000000..c0609b5460 --- /dev/null +++ b/src/daft-connect/src/translation/logical_plan/drop.rs @@ -0,0 +1,39 @@ +use eyre::bail; + +use crate::translation::to_logical_plan; + +pub fn drop(drop: spark_connect::Drop) -> eyre::Result { + let spark_connect::Drop { + input, + columns, + column_names, + } = drop; + + let Some(input) = input else { + bail!("input is required"); + }; + + if !columns.is_empty() { + bail!("columns is not supported; use column_names instead"); + } + + let plan = to_logical_plan(*input)?; + + // Get all column names from the schema + let all_columns = plan.schema().names(); + + // Create a set of columns to drop for efficient lookup + let columns_to_drop: std::collections::HashSet<_> = column_names.iter().collect(); + + // Create expressions for all columns except the ones being dropped + let to_select = all_columns + .iter() + .filter(|col_name| !columns_to_drop.contains(*col_name)) + .map(|col_name| daft_dsl::col(col_name.clone())) + .collect(); + + // Use select to keep only the columns we want + let plan = plan.select(to_select)?; + + Ok(plan) +} diff --git a/tests/connect/test_drop.py b/tests/connect/test_drop.py new file mode 100644 index 0000000000..635f79c1d2 --- /dev/null +++ b/tests/connect/test_drop.py @@ -0,0 +1,17 @@ +from __future__ import annotations + + +def test_drop(spark_session): + # Create DataFrame from range(10) + df = spark_session.range(10) + + # Drop the 'id' column + df_dropped = df.drop("id") + + # Verify the drop was successful + assert "id" not in df_dropped.columns, "Column 'id' should be dropped" + assert len(df_dropped.columns) == len(df.columns) - 1, "Should have one less column after drop" + + # Verify the DataFrame has no columns after dropping all columns" + assert len(df_dropped.toPandas().columns) == 0, "DataFrame should have no columns after dropping 'id'" + assert df_dropped.count() == df.count(), "Row count should be unchanged after drop"