Skip to content

Commit

Permalink
skipped failing tests for Databricks 14.3 to pass CI
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri committed Jan 18, 2025
1 parent c34d4e1 commit e6fde7b
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from delta_lake_utils import delta_meta_allow
from marks import allow_non_gpu, delta_lake
from pyspark.sql.functions import *
from spark_session import is_databricks104_or_later
from spark_session import is_databricks104_or_later, is_databricks143_or_later

_conf = {'spark.rapids.sql.explain': 'ALL',
'spark.databricks.delta.autoCompact.minNumFiles': 3} # Num files before compaction.
Expand Down Expand Up @@ -180,6 +180,7 @@ def verify_table_history(spark):
@pytest.mark.skipif(not is_databricks104_or_later(),
reason="Auto compaction of Delta Lake tables is only supported "
"on Databricks 10.4+")
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_auto_compact_min_num_files(spark_tmp_path):
"""
This test verifies that auto-compaction honours the minNumFiles setting.
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/src/main/python/delta_lake_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from delta_lake_utils import *
from marks import *
from spark_session import is_before_spark_320, is_databricks_runtime, supports_delta_lake_deletion_vectors, \
with_cpu_session, with_gpu_session
with_cpu_session, with_gpu_session, is_databricks143_or_later

delta_delete_enabled_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.DeleteCommand": "true",
Expand Down Expand Up @@ -157,6 +157,7 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_delete_rows(spark_tmp_path, use_cdf, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
num_slices_to_test = 1 if is_databricks_runtime() else 10
Expand All @@ -176,8 +177,10 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_delete_dataframe_api(spark_tmp_path, use_cdf, partition_columns):
from delta.tables import DeltaTable
confs = copy_and_update(delta_delete_enabled_conf, non_gpu_conf)
data_path = spark_tmp_path + "/DELTA_DATA"
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
num_slices_to_test = 1 if is_databricks_runtime() else 10
Expand All @@ -192,5 +195,5 @@ def do_delete(spark, path):
dest_table.delete("b > 'c'")
read_func = read_delta_path_with_cdf if use_cdf else read_delta_path
assert_gpu_and_cpu_writes_are_equal_collect(do_delete, read_func, data_path,
conf=delta_delete_enabled_conf)
conf=confs)
with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from delta_lake_merge_common import *
from marks import *
from pyspark.sql.types import *
from spark_session import is_databricks133_or_later, spark_version
from spark_session import is_databricks133_or_later, spark_version, is_databricks143_or_later

delta_merge_enabled_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.MergeIntoCommand": "true",
Expand All @@ -35,6 +35,7 @@
"delta 2.4")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_low_shuffle_merge_when_gpu_file_scan_override_failed(spark_tmp_path,
spark_tmp_table_factory,
use_cdf, num_slices):
Expand Down Expand Up @@ -70,6 +71,7 @@ def test_delta_low_shuffle_merge_when_gpu_file_scan_override_failed(spark_tmp_pa
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_factory, table_ranges,
use_cdf, partition_columns, num_slices):
do_test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_factory,
Expand All @@ -90,6 +92,7 @@ def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_facto
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory, table_ranges,
use_cdf, partition_columns, num_slices):
do_test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory, table_ranges,
Expand All @@ -105,6 +108,7 @@ def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory,
"delta 2.4")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
do_test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf,
num_slices, False, delta_merge_enabled_conf)
Expand All @@ -129,6 +133,7 @@ def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, us
" WHEN NOT MATCHED AND s.b > 'b' AND s.b < 'f' THEN INSERT *" \
" WHEN NOT MATCHED AND s.b > 'f' AND s.b < 'z' THEN INSERT (b) VALUES ('not here')" ], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, merge_sql, num_slices):
do_test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf,
merge_sql, num_slices, False,
Expand All @@ -143,6 +148,7 @@ def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_facto
"delta 2.4")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
do_test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path,
spark_tmp_table_factory,
Expand All @@ -159,6 +165,7 @@ def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spa
reason="Delta Lake Low Shuffle Merge only supports Databricks 13.3 or OSS "
"delta 2.4")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf):
do_test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf,
delta_merge_enabled_conf)
Expand Down
11 changes: 10 additions & 1 deletion integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from delta_lake_merge_common import *
from marks import *
from pyspark.sql.types import *
from spark_session import is_before_spark_320, is_databricks_runtime, spark_version
from spark_session import is_before_spark_320, is_databricks_runtime, spark_version, is_databricks143_or_later


delta_merge_enabled_conf = copy_and_update(delta_writes_enabled_conf,
Expand All @@ -37,6 +37,7 @@
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_disabled_fallback(spark_tmp_path, spark_tmp_table_factory, disable_conf):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path,
Expand Down Expand Up @@ -84,6 +85,7 @@ def checker(data_path, do_merge):
@pytest.mark.parametrize("disable_conf", [
"spark.rapids.sql.exec.RapidsProcessDeltaMergeJoinExec",
"spark.rapids.sql.expression.Add"], ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_partial_fallback_via_conf(spark_tmp_path, spark_tmp_table_factory,
use_cdf, partition_columns, num_slices, disable_conf):
src_range, dest_range = range(20), range(10, 30)
Expand Down Expand Up @@ -111,6 +113,7 @@ def test_delta_merge_partial_fallback_via_conf(spark_tmp_path, spark_tmp_table_f
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_factory, table_ranges,
use_cdf, partition_columns, num_slices):
do_test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_factory,
Expand All @@ -128,6 +131,7 @@ def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_facto
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory, table_ranges,
use_cdf, partition_columns, num_slices):
do_test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory, table_ranges,
Expand All @@ -140,6 +144,7 @@ def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory,
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
do_test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf,
num_slices, num_slices == 1, delta_merge_enabled_conf)
Expand All @@ -162,6 +167,7 @@ def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, us
" WHEN NOT MATCHED AND s.b > 'b' AND s.b < 'f' THEN INSERT *" \
" WHEN NOT MATCHED AND s.b > 'f' AND s.b < 'z' THEN INSERT (b) VALUES ('not here')" ], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, merge_sql, num_slices):
do_test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf,
merge_sql, num_slices, num_slices == 1,
Expand All @@ -174,6 +180,7 @@ def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_facto
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
do_test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path,
spark_tmp_table_factory, use_cdf,
Expand All @@ -185,6 +192,7 @@ def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spa
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf):
do_test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf,
delta_merge_enabled_conf)
Expand All @@ -196,6 +204,7 @@ def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_fac
@pytest.mark.xfail(not is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/7573")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_merge_dataframe_api(spark_tmp_path, use_cdf, num_slices):
from delta.tables import DeltaTable
data_path = spark_tmp_path + "/DELTA_DATA"
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/delta_lake_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def setup_tables(spark):
@ignore_order(local=True)
@pytest.mark.parametrize("reader_confs", reader_opt_confs_no_native, ids=idfn)
@pytest.mark.parametrize("mapping", column_mappings, ids=idfn)
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
non_gpu_conf = {}
if (reader_confs["spark.rapids.sql.format.parquet.reader.type"] != "PERFILE" and is_databricks143_or_later()):
Expand Down Expand Up @@ -133,6 +134,7 @@ def test_delta_read_column_mapping(spark_tmp_path, reader_confs, mapping):
@ignore_order(local=True)
@pytest.mark.skipif(not (is_databricks_runtime() or is_spark_340_or_later()), \
reason="ParquetToSparkSchemaConverter changes not compatible with Delta Lake")
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_name_column_mapping_no_field_ids(spark_tmp_path):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_parquet_table(spark):
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/delta_lake_update_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from delta_lake_utils import *
from marks import *
from spark_session import is_before_spark_320, is_databricks_runtime, \
supports_delta_lake_deletion_vectors, with_cpu_session, with_gpu_session
supports_delta_lake_deletion_vectors, with_cpu_session, with_gpu_session, is_databricks143_or_later

delta_update_enabled_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.UpdateCommand": "true",
Expand Down Expand Up @@ -72,6 +72,7 @@ def checker(data_path, do_update):
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
Expand All @@ -91,6 +92,7 @@ def write_func(spark, path):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_entire_table(spark_tmp_path, use_cdf, partition_columns):
def generate_dest_data(spark):
return three_col_df(spark,
Expand All @@ -107,6 +109,7 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [["a"], ["a", "b"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_partitions(spark_tmp_path, use_cdf, partition_columns):
def generate_dest_data(spark):
return three_col_df(spark,
Expand All @@ -124,6 +127,7 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_rows(spark_tmp_path, use_cdf, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
num_slices_to_test = 1 if is_databricks_runtime() else 10
Expand Down Expand Up @@ -163,6 +167,7 @@ def generate_dest_data(spark):
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/10025')
@pytest.mark.skipif(is_databricks143_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/11541")
def test_delta_update_dataframe_api(spark_tmp_path, use_cdf, partition_columns):
from delta.tables import DeltaTable
data_path = spark_tmp_path + "/DELTA_DATA"
Expand Down
Loading

0 comments on commit e6fde7b

Please sign in to comment.