diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3eb74eee1f..d2e2d3fdf2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -421,6 +421,7 @@ def overwrite( self, df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + case_sensitive: bool = True, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: """ @@ -436,6 +437,7 @@ def overwrite( df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite + case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive snapshot_properties: Custom properties to be added to the snapshot summary """ try: @@ -459,7 +461,7 @@ def overwrite( self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) + self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties) with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty @@ -470,17 +472,23 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) - def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def delete( + self, + delete_filter: Union[str, BooleanExpression], + case_sensitive: bool = True, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ) -> None: """ Shorthand for deleting record from a table. - An deletee may produce zero or more snapshots based on the operation: + A delete may produce zero or more snapshots based on the operation: - DELETE: In case existing Parquet files can be dropped completely. - REPLACE: In case existing Parquet files need to be rewritten Args: delete_filter: A boolean expression to delete rows from a table + case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive snapshot_properties: Custom properties to be added to the snapshot summary """ from pyiceberg.io.pyarrow import ( @@ -503,7 +511,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: - bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True) + bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) files = self._scan(row_filter=delete_filter).plan_files() @@ -1008,17 +1016,21 @@ def overwrite( tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) def delete( - self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, + delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + case_sensitive: bool = True, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: """ Shorthand for deleting rows from the table. Args: delete_filter: The predicate that used to remove rows + case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) + tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties) def add_files( self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True @@ -1311,7 +1323,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent class DataScan(TableScan): def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id]) + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) return project(self.row_filter) @cached_property diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 47e5fc55e3..e4d4386ad3 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -318,6 +318,7 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]): """ _predicate: BooleanExpression + _case_sensitive: bool def __init__( self, @@ -329,6 +330,7 @@ def __init__( ): super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) self._predicate = AlwaysFalse() + self._case_sensitive = True def _commit(self) -> UpdatesAndRequirements: # Only produce a commit when there is something to delete @@ -340,7 +342,7 @@ def _commit(self) -> UpdatesAndRequirements: def _build_partition_projection(self, spec_id: int) -> BooleanExpression: schema = self._transaction.table_metadata.schema() spec = self._transaction.table_metadata.specs()[spec_id] - project = inclusive_projection(schema, spec) + project = inclusive_projection(schema, spec, self._case_sensitive) return project(self._predicate) @cached_property @@ -352,8 +354,9 @@ def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bo spec = self._transaction.table_metadata.specs()[spec_id] return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) - def delete_by_predicate(self, predicate: BooleanExpression) -> None: + def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None: self._predicate = Or(self._predicate, predicate) + self._case_sensitive = case_sensitive @cached_property def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]: @@ -376,8 +379,10 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ) manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval - inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval + inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( + schema, self._predicate, case_sensitive=self._case_sensitive + ).eval existing_manifests = [] total_deleted_entries = [] diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 040c67034b..0cfd77e400 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -310,6 +310,19 @@ def test_table_scan_row_filter(table_v2: Table) -> None: assert scan.filter(EqualTo("x", 10)).filter(In("y", (10, 11))).row_filter == And(EqualTo("x", 10), In("y", (10, 11))) +def test_table_scan_partition_filters_case_sensitive(table_v2: Table) -> None: + scan = table_v2.scan(row_filter=EqualTo("X", 10), case_sensitive=True) + with pytest.raises(ValueError): + for i in range(len(table_v2.metadata.specs())): + _ = scan.partition_filters[i] + + +def test_table_scan_partition_filters_case_insensitive(table_v2: Table) -> None: + scan = table_v2.scan(row_filter=EqualTo("X", 10), case_sensitive=False) + for i in range(len(table_v2.metadata.specs())): + _ = scan.partition_filters[i] + + def test_table_scan_ref(table_v2: Table) -> None: scan = table_v2.scan() assert scan.use_ref("test").snapshot_id == 3051729675574597004