Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize lift + shift for cross-db macros #359

Merged
merged 7 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbt/include/spark/macros/utils/any_value.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% macro spark__any_value(expression) -%}
{#-- return any value (non-deterministic) --#}
first({{ expression }})

{%- endmacro %}
9 changes: 9 additions & 0 deletions dbt/include/spark/macros/utils/assert_not_null.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro assert_not_null(function, arg) -%}
{{ return(adapter.dispatch('assert_not_null', 'dbt')(function, arg)) }}
{%- endmacro %}

{% macro spark__assert_not_null(function, arg) %}

coalesce({{function}}({{arg}}), nvl2({{function}}({{arg}}), assert_true({{function}}({{arg}}) is not null), null))

{% endmacro %}
11 changes: 11 additions & 0 deletions dbt/include/spark/macros/utils/bool_or.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{#-- Spark v3 supports 'bool_or' and 'any', but Spark v2 needs to use 'max' for this
-- https://spark.apache.org/docs/latest/api/sql/index.html#any
-- https://spark.apache.org/docs/latest/api/sql/index.html#bool_or
-- https://spark.apache.org/docs/latest/api/sql/index.html#max
#}

{% macro spark__bool_or(expression) -%}

max({{ expression }})

{%- endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/spark/macros/utils/concat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro spark__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
62 changes: 62 additions & 0 deletions dbt/include/spark/macros/utils/dateadd.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{% macro spark__dateadd(datepart, interval, from_date_or_timestamp) %}

{%- set clock_component -%}
{# make sure the dates + timestamps are real, otherwise raise an error asap #}
to_unix_timestamp({{ assert_not_null('to_timestamp', from_date_or_timestamp) }})
- to_unix_timestamp({{ assert_not_null('date', from_date_or_timestamp) }})
{%- endset -%}

{%- if datepart in ['day', 'week'] -%}

{%- set multiplier = 7 if datepart == 'week' else 1 -%}

to_timestamp(
to_unix_timestamp(
date_add(
{{ assert_not_null('date', from_date_or_timestamp) }},
cast({{interval}} * {{multiplier}} as int)
)
) + {{clock_component}}
)

{%- elif datepart in ['month', 'quarter', 'year'] -%}

{%- set multiplier -%}
{%- if datepart == 'month' -%} 1
{%- elif datepart == 'quarter' -%} 3
{%- elif datepart == 'year' -%} 12
{%- endif -%}
{%- endset -%}

to_timestamp(
to_unix_timestamp(
add_months(
{{ assert_not_null('date', from_date_or_timestamp) }},
cast({{interval}} * {{multiplier}} as int)
)
) + {{clock_component}}
)

{%- elif datepart in ('hour', 'minute', 'second', 'millisecond', 'microsecond') -%}

{%- set multiplier -%}
{%- if datepart == 'hour' -%} 3600
{%- elif datepart == 'minute' -%} 60
{%- elif datepart == 'second' -%} 1
{%- elif datepart == 'millisecond' -%} (1/1000000)
{%- elif datepart == 'microsecond' -%} (1/1000000)
{%- endif -%}
{%- endset -%}

to_timestamp(
{{ assert_not_null('to_unix_timestamp', from_date_or_timestamp) }}
+ cast({{interval}} * {{multiplier}} as int)
)

{%- else -%}

{{ exceptions.raise_compiler_error("macro dateadd not implemented for datepart ~ '" ~ datepart ~ "' ~ on Spark") }}

{%- endif -%}

{% endmacro %}
107 changes: 107 additions & 0 deletions dbt/include/spark/macros/utils/datediff.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{% macro spark__datediff(first_date, second_date, datepart) %}

{%- if datepart in ['day', 'week', 'month', 'quarter', 'year'] -%}

{# make sure the dates are real, otherwise raise an error asap #}
{% set first_date = assert_not_null('date', first_date) %}
{% set second_date = assert_not_null('date', second_date) %}

{%- endif -%}

{%- if datepart == 'day' -%}

datediff({{second_date}}, {{first_date}})

{%- elif datepart == 'week' -%}

case when {{first_date}} < {{second_date}}
then floor(datediff({{second_date}}, {{first_date}})/7)
else ceil(datediff({{second_date}}, {{first_date}})/7)
end

-- did we cross a week boundary (Sunday)?
+ case
when {{first_date}} < {{second_date}} and dayofweek({{second_date}}) < dayofweek({{first_date}}) then 1
when {{first_date}} > {{second_date}} and dayofweek({{second_date}}) > dayofweek({{first_date}}) then -1
else 0 end

{%- elif datepart == 'month' -%}

case when {{first_date}} < {{second_date}}
then floor(months_between(date({{second_date}}), date({{first_date}})))
else ceil(months_between(date({{second_date}}), date({{first_date}})))
end

-- did we cross a month boundary?
+ case
when {{first_date}} < {{second_date}} and dayofmonth({{second_date}}) < dayofmonth({{first_date}}) then 1
when {{first_date}} > {{second_date}} and dayofmonth({{second_date}}) > dayofmonth({{first_date}}) then -1
else 0 end

{%- elif datepart == 'quarter' -%}

case when {{first_date}} < {{second_date}}
then floor(months_between(date({{second_date}}), date({{first_date}}))/3)
else ceil(months_between(date({{second_date}}), date({{first_date}}))/3)
end

-- did we cross a quarter boundary?
+ case
when {{first_date}} < {{second_date}} and (
(dayofyear({{second_date}}) - (quarter({{second_date}}) * 365/4))
< (dayofyear({{first_date}}) - (quarter({{first_date}}) * 365/4))
) then 1
when {{first_date}} > {{second_date}} and (
(dayofyear({{second_date}}) - (quarter({{second_date}}) * 365/4))
> (dayofyear({{first_date}}) - (quarter({{first_date}}) * 365/4))
) then -1
else 0 end

{%- elif datepart == 'year' -%}

year({{second_date}}) - year({{first_date}})

{%- elif datepart in ('hour', 'minute', 'second', 'millisecond', 'microsecond') -%}

{%- set divisor -%}
{%- if datepart == 'hour' -%} 3600
{%- elif datepart == 'minute' -%} 60
{%- elif datepart == 'second' -%} 1
{%- elif datepart == 'millisecond' -%} (1/1000)
{%- elif datepart == 'microsecond' -%} (1/1000000)
{%- endif -%}
{%- endset -%}

case when {{first_date}} < {{second_date}}
then ceil((
{# make sure the timestamps are real, otherwise raise an error asap #}
{{ assert_not_null('to_unix_timestamp', assert_not_null('to_timestamp', second_date)) }}
- {{ assert_not_null('to_unix_timestamp', assert_not_null('to_timestamp', first_date)) }}
) / {{divisor}})
else floor((
{{ assert_not_null('to_unix_timestamp', assert_not_null('to_timestamp', second_date)) }}
- {{ assert_not_null('to_unix_timestamp', assert_not_null('to_timestamp', first_date)) }}
) / {{divisor}})
end

{% if datepart == 'millisecond' %}
+ cast(date_format({{second_date}}, 'SSS') as int)
- cast(date_format({{first_date}}, 'SSS') as int)
{% endif %}

{% if datepart == 'microsecond' %}
{% set capture_str = '[0-9]{4}-[0-9]{2}-[0-9]{2}.[0-9]{2}:[0-9]{2}:[0-9]{2}.([0-9]{6})' %}
-- Spark doesn't really support microseconds, so this is a massive hack!
-- It will only work if the timestamp-string is of the format
-- 'yyyy-MM-dd-HH mm.ss.SSSSSS'
+ cast(regexp_extract({{second_date}}, '{{capture_str}}', 1) as int)
- cast(regexp_extract({{first_date}}, '{{capture_str}}', 1) as int)
{% endif %}

{%- else -%}

{{ exceptions.raise_compiler_error("macro datediff not implemented for datepart ~ '" ~ datepart ~ "' ~ on Spark") }}

{%- endif -%}

{% endmacro %}
17 changes: 17 additions & 0 deletions dbt/include/spark/macros/utils/listagg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{% macro spark__listagg(measure, delimiter_text, order_by_clause, limit_num) -%}

{% if order_by_clause %}
{{ exceptions.warn("order_by_clause is not supported for listagg on Spark/Databricks") }}
{% endif %}
Comment on lines +3 to +5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs make it pretty clear that:

The function is non-deterministic because the order of collected results depends on the order of the rows which may be non-deterministic after a shuffle.

The only way I've found to this requires a subquery to calculate rank() first, then passed into collect_list (with a struct and array_sort to boot, probably)


{% set collect_list %} collect_list({{ measure }}) {% endset %}

{% set limited %} slice({{ collect_list }}, 1, {{ limit_num }}) {% endset %}

{% set collected = limited if limit_num else collect_list %}

{% set final %} array_join({{ collected }}, {{ delimiter_text }}) {% endset %}

{% do return(final) %}

{%- endmacro %}
23 changes: 23 additions & 0 deletions dbt/include/spark/macros/utils/split_part.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% macro spark__split_part(string_text, delimiter_text, part_number) %}

{% set delimiter_expr %}

-- escape if starts with a special character
case when regexp_extract({{ delimiter_text }}, '([^A-Za-z0-9])(.*)', 1) != '_'
then concat('\\', {{ delimiter_text }})
else {{ delimiter_text }} end

{% endset %}

{% set split_part_expr %}

split(
{{ string_text }},
{{ delimiter_expr }}
)[({{ part_number - 1 }})]

{% endset %}

{{ return(split_part_expr) }}

{% endmacro %}
4 changes: 3 additions & 1 deletion tests/functional/adapter/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def project_config_update(self):
}


#hese tests were not enabled in the dbtspec files, so skipping here.
# These tests were not enabled in the dbtspec files, so skipping here.
# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource
@pytest.mark.skip_profile('apache_spark', 'spark_session')
class TestSnapshotTimestampSpark(BaseSnapshotTimestamp):
Expand All @@ -79,5 +79,7 @@ def project_config_update(self):
}
}


@pytest.mark.skip_profile('spark_session')
class TestBaseAdapterMethod(BaseAdapterMethod):
pass
61 changes: 61 additions & 0 deletions tests/functional/adapter/utils/fixture_listagg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# SparkSQL does not support 'order by' for its 'listagg' equivalent
# the argument is ignored, so let's ignore those fields when checking equivalency

models__test_listagg_no_order_by_sql = """
with data as (
select * from {{ ref('data_listagg') }}
),
data_output as (
select * from {{ ref('data_listagg_output') }}
),
calculate as (
/*

select
group_col,
{{ listagg('string_text', "'_|_'", "order by order_col") }} as actual,
'bottom_ordered' as version
from data
group by group_col
union all
select
group_col,
{{ listagg('string_text', "'_|_'", "order by order_col", 2) }} as actual,
'bottom_ordered_limited' as version
from data
group by group_col
union all

*/
select
group_col,
{{ listagg('string_text', "', '") }} as actual,
'comma_whitespace_unordered' as version
from data
where group_col = 3
group by group_col
union all
select
group_col,
{{ listagg('DISTINCT string_text', "','") }} as actual,
'distinct_comma' as version
from data
where group_col = 3
group by group_col
union all
select
group_col,
{{ listagg('string_text') }} as actual,
'no_params' as version
from data
where group_col = 3
group by group_col
)
select
calculate.actual,
data_output.expected
from calculate
left join data_output
on calculate.group_col = data_output.group_col
and calculate.version = data_output.version
"""
Loading