Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg is not a valid Spark SQL Data Source #492

Open
nbenezri opened this issue Dec 30, 2024 · 0 comments
Open

iceberg is not a valid Spark SQL Data Source #492

nbenezri opened this issue Dec 30, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@nbenezri
Copy link

nbenezri commented Dec 30, 2024

Describe the bug

While running a simple model that select from parquet table create iceberg table I get:
Caused by: org.apache.spark.sql.AnalysisException: iceberg is not a valid Spark SQL Data Source.

I tried to follow #405 and apache/iceberg#1756 but without luck.

Steps To Reproduce

profile.yml

glue_dbt:
  target: dev
  outputs:
    dev:
      type: glue
      role_arn: <arn>
      region: us-east-1
      glue_version: "5.0"
      workers: 2
      worker_type: G.1X
      threads: 50
      schema: "iceberg_dev"
      idle_timeout: 2
      session_provisioning_timeout_in_seconds: 120
      location: "s3://<bucket>/lake/test_schema"
      custom_iceberg_catalog_namespace : ""
      extra_jars: s3://<bucket>/jars/iceberg-aws-bundle-1.7.0.jar,s3://<bucket>/jars/iceberg-spark-runtime-3.3_2.12-1.7.0.jar
      conf: |
        spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
        --conf spark.sql.catalog.spark_catalog.warehouse=s3://<bucket>/lake/test_schema
        --conf spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
        --conf spark.sql.catalog.spark_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
        --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtension

dbt_project.yml

name: 'glue_dbt'
version: '1.0.0'

profile: 'glue_dbt'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_packages"

models:
  glue_dbt:
    materialized: table

model/glue/dbt_tracks.sql

{{ config(
    materialized='incremental',
    incremental_strategy='append',
    unique_key=['name'],
    file_format='iceberg',
    iceberg_expire_snapshots='False',
    table_properties={'write.target-file-size-bytes': '268435456'}
) }}
WITH
parquet_tracks AS (
    SELECT DISTINCT
        id, 
        name
    FROM iceberg_dev.stg_tracks 
    LIMIT 1
)

SELECT *
FROM parquet_tracks

Expected behavior

dbt run works and create the iceberg table with the data.

Screenshots and log output

(glue-dbt) glue_dbt % dbt run 
07:30:35  Running with dbt=1.9.1
07:30:35  Registered adapter: glue=1.9.0
07:30:35  Unable to do partial parsing because profile has changed
07:30:36  Found 1 model, 5 data tests, 515 macros
07:30:36  
07:30:36  Concurrency: 50 threads (target='dev')
07:30:36  
07:30:37  1 of 1 START sql incremental model iceberg_dev.dbt_tracks .............. [RUN]
07:30:38  Glue adapter: Parameter validation failed:
Invalid type for parameter Name, value: , type: <class 'jinja2.runtime.Undefined'>, valid types: <class 'str'>
07:32:39  Glue adapter: Glue returned `error` for statement None for code SqlWrapper2.execute('''/* {"app": "dbt", "dbt_version": "1.9.1", "profile_name": "glue_dbt", "target_name": "dev", "node_id": "model.glue_dbt.dbt_tracks"} */

    insert into table iceberg_dev.dbt_tracks
    select id, name from dbt_tracks_tmp
''', use_arrow=False, location='s3://<bucket>/lake/test_schema'), Py4JJavaError: An error occurred while calling o241.sql.
: java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisException: iceberg is not a valid Spark SQL Data Source.
        at org.sparkproject.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
        at org.sparkproject.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
        at org.sparkproject.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
        at org.sparkproject.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
        at org.sparkproject.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
        at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
        at org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
        at org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:212)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:277)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:332)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:329)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:199)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:199)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:353)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:197)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:193)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:34)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:128)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:125)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:34)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:85)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:84)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:34)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:329)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:249)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:236)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:319)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:368)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:319)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:309)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:309)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:195)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:191)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:303)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$2(Analyzer.scala:299)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:216)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:245)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:270)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:360)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:269)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:93)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:219)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:277)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:711)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:277)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:276)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:90)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:82)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:639)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:660)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.sql.AnalysisException: iceberg is not a valid Spark SQL Data Source.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidDataSourceError(QueryCompilationErrors.scala:1542)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:436)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:268)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:255)
        at org.sparkproject.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
        at org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        ... 76 more

07:32:39  1 of 1 ERROR creating sql incremental model iceberg_dev.dbt_tracks ..... [ERROR in 121.93s]
07:32:40  
07:32:40  Finished running 1 incremental model in 0 hours 2 minutes and 4.19 seconds (124.19s).
07:32:40  
07:32:40  Completed with 1 error, 0 partial successes, and 0 warnings:
07:32:40  
07:32:40    Database Error in model dbt_tracks (models/glue/dbt_tracks.sql)

System information

The output of dbt --version:

Core:
  - installed: 1.9.1
  - latest:    1.9.1 - Up to date!

Plugins:
  - glue:     1.9.0 - Up to date!
  - redshift: 1.9.0 - Up to date!
  - postgres: 1.9.0 - Up to date!
  - spark:    1.9.0 - Up to date!```

The operating system you're using:

ProductName:            macOS
ProductVersion:         15.1.1
BuildVersion:           24B91

The output of python --version:

Python 3.13.1

Additional context

I tried a few variations of profile.yml. including:
glue 4/5
with/without:

  • extra_jars
  • spark.sql.catalog.glue_catalog
  • spark.sql.defaultCatalog=glue_catalog
  • datalake_formats

parquet table is an external table based on json:

ROW FORMAT SERDE "org.apache.hive.hcatalog.data.JsonSerDe"
WITH SERDEPROPERTIES (
    'ignore.malformed.json' = 'true'
)
LOCATION '<s3 path>'
TBLPROPERTIES (
    'projection.enabled' = 'true',
    'json.max.read.errors' = '100',
    'compression.type' = 'GZIP' ,
    'projection.enabled' = 'true',
  'projection.date.type' = 'date',
  'projection.date.interval' = '1',
  'projection.date.format' = 'yyyy/MM/dd',
  'timestamp.formats' = "yyyyMMdd'T'HH:mm:ss",
  'projection.date.unit' = 'DAYS',
  'projection.date.range' = '2021/01/01,NOW',
  'storage.location.template' = '<s3 path>/${date}/'
);
@nbenezri nbenezri added the bug Something isn't working label Dec 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant