Skip to content

Commit

Permalink
Merge pull request #6 from mythrocks/db14.3-xfail-delta-lake-failing-…
Browse files Browse the repository at this point in the history
…tests

XFail delta-lake tests failing on databricks 14.3
  • Loading branch information
razajafri authored Jan 21, 2025
2 parents c34d4e1 + 5c08870 commit 45c83d3
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 8 deletions.
8 changes: 6 additions & 2 deletions integration_tests/src/main/python/delta_lake_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from data_gen import *
from delta_lake_utils import *
from marks import *
from spark_session import is_before_spark_320, is_databricks_runtime, supports_delta_lake_deletion_vectors, \
with_cpu_session, with_gpu_session
from spark_session import is_before_spark_320, is_databricks_runtime, is_databricks_version_or_later, \
supports_delta_lake_deletion_vectors, with_cpu_session, with_gpu_session

delta_delete_enabled_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.DeleteCommand": "true",
Expand Down Expand Up @@ -156,6 +156,8 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
def test_delta_delete_rows(spark_tmp_path, use_cdf, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
Expand All @@ -175,6 +177,8 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
def test_delta_delete_dataframe_api(spark_tmp_path, use_cdf, partition_columns):
from delta.tables import DeltaTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from delta_lake_merge_common import *
from marks import *
from pyspark.sql.types import *
from spark_session import is_databricks133_or_later, spark_version
from spark_session import is_databricks133_or_later, is_databricks_version_or_later, spark_version

delta_merge_enabled_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.MergeIntoCommand": "true",
Expand All @@ -33,6 +33,8 @@
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_low_shuffle_merge_when_gpu_file_scan_override_failed(spark_tmp_path,
Expand Down Expand Up @@ -63,6 +65,8 @@ def test_delta_low_shuffle_merge_when_gpu_file_scan_override_failed(spark_tmp_pa
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("table_ranges", [(range(20), range(10)), # partial insert of source
(range(5), range(5)), # no-op insert
(range(10), range(20, 30)) # full insert of source
Expand All @@ -83,6 +87,8 @@ def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_facto
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("table_ranges", [(range(10), range(20)), # partial delete of target
(range(5), range(5)), # full delete of target
(range(10), range(20, 30)) # no-op delete
Expand All @@ -103,6 +109,8 @@ def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory,
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
Expand All @@ -116,6 +124,8 @@ def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, us
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("merge_sql", [
"MERGE INTO {dest_table} d USING {src_table} s ON d.a == s.a" \
Expand All @@ -141,6 +151,8 @@ def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_facto
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
Expand All @@ -158,6 +170,8 @@ def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spa
(not is_databricks_runtime() and spark_version().startswith("3.4"))),
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf):
do_test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf,
Expand Down
18 changes: 17 additions & 1 deletion integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from delta_lake_merge_common import *
from marks import *
from pyspark.sql.types import *
from spark_session import is_before_spark_320, is_databricks_runtime, spark_version
from spark_session import is_before_spark_320, is_databricks_runtime, is_databricks_version_or_later, spark_version


delta_merge_enabled_conf = copy_and_update(delta_writes_enabled_conf,
Expand All @@ -37,6 +37,8 @@
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_disabled_fallback(spark_tmp_path, spark_tmp_table_factory, disable_conf):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path,
Expand Down Expand Up @@ -78,6 +80,8 @@ def checker(data_path, do_merge):
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
Expand All @@ -104,6 +108,8 @@ def test_delta_merge_partial_fallback_via_conf(spark_tmp_path, spark_tmp_table_f
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("table_ranges", [(range(20), range(10)), # partial insert of source
(range(5), range(5)), # no-op insert
(range(10), range(20, 30)) # full insert of source
Expand All @@ -121,6 +127,8 @@ def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_facto
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("table_ranges", [(range(10), range(20)), # partial delete of target
(range(5), range(5)), # full delete of target
(range(10), range(20, 30)) # no-op delete
Expand All @@ -138,6 +146,8 @@ def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory,
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
Expand Down Expand Up @@ -172,6 +182,8 @@ def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_facto
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
Expand All @@ -184,6 +196,8 @@ def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spa
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf):
do_test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf,
Expand All @@ -194,6 +208,8 @@ def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_fac
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(not is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/7573")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_dataframe_api(spark_tmp_path, use_cdf, num_slices):
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/delta_lake_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from marks import allow_non_gpu, delta_lake, ignore_order
from parquet_test import reader_opt_confs_no_native
from spark_session import with_cpu_session, with_gpu_session, is_databricks_runtime, \
is_spark_320_or_later, is_spark_340_or_later, supports_delta_lake_deletion_vectors, is_databricks143_or_later
is_databricks_version_or_later, is_spark_320_or_later, is_spark_340_or_later, \
supports_delta_lake_deletion_vectors, is_databricks143_or_later

_conf = {'spark.rapids.sql.explain': 'ALL'}

Expand Down Expand Up @@ -100,6 +101,8 @@ def setup_tables(spark):
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order(local=True)
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@pytest.mark.parametrize("reader_confs", reader_opt_confs_no_native, ids=idfn)
@pytest.mark.parametrize("mapping", column_mappings, ids=idfn)
def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
Expand Down Expand Up @@ -133,6 +136,8 @@ def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
@ignore_order(local=True)
@pytest.mark.skipif(not (is_databricks_runtime() or is_spark_340_or_later()), \
reason="ParquetToSparkSchemaConverter changes not compatible with Delta Lake")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_name_column_mapping_no_field_ids(spark_tmp_path):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_parquet_table(spark):
Expand Down
10 changes: 9 additions & 1 deletion integration_tests/src/main/python/delta_lake_update_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from delta_lake_utils import *
from marks import *
from spark_session import is_before_spark_320, is_databricks_runtime, \
from spark_session import is_before_spark_320, is_databricks_runtime, is_databricks_version_or_later, \
supports_delta_lake_deletion_vectors, with_cpu_session, with_gpu_session

delta_update_enabled_conf = copy_and_update(delta_writes_enabled_conf,
Expand Down Expand Up @@ -72,6 +72,8 @@ def checker(data_path, do_update):
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
Expand All @@ -91,6 +93,8 @@ def write_func(spark, path):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_entire_table(spark_tmp_path, use_cdf, partition_columns):
def generate_dest_data(spark):
return three_col_df(spark,
Expand All @@ -107,6 +111,8 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [["a"], ["a", "b"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_partitions(spark_tmp_path, use_cdf, partition_columns):
def generate_dest_data(spark):
return three_col_df(spark,
Expand All @@ -123,6 +129,8 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(condition=is_databricks_version_or_later(14,3),
reason="Will be triaged as part of https://github.com/NVIDIA/spark-rapids/issues/11541")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
def test_delta_update_rows(spark_tmp_path, use_cdf, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
Expand Down
Loading

0 comments on commit 45c83d3

Please sign in to comment.