-
Notifications
You must be signed in to change notification settings - Fork 324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract and send events with raw sql from spark. #2913
Extract and send events with raw sql from spark. #2913
Conversation
69d029b
to
cbb5e15
Compare
246e79a
to
36d8e5d
Compare
Please update PR description and title |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just did only high level review with two items - because IMO we have to rethink those before moving forward with the PR.
@@ -131,6 +131,28 @@ public void end(SparkListenerApplicationEnd applicationEnd) { | |||
eventEmitter.emit(event); | |||
} | |||
|
|||
@Override | |||
public void sqlQuery(String query) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes sense to put that here.
First, there can be multiple Spark SQL queries tied to a single "application" - script - think just
df = spark.sql("select * from a");
df2 = spark.sql("select * from b");
df.join(df2, ...).write() ...
In this case, you'd get two events here with application run id, without ability to tie them to particular Spark Actions.
I think the better idea is to check if we can get something like executionId
from Spark Session, and store the SQL in some map keyed by this executionId
. Then, attach the SqlJobFacet to the next (all) COMPLETE
and RUNNING
events send with that executionId
.
integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkTestsUtils.java
Show resolved
Hide resolved
e4853c0
to
a3cdc67
Compare
...n/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java
Show resolved
Hide resolved
93c91a5
to
e04c9e3
Compare
Signed-off-by: pawelkocinski <[email protected]>
e04c9e3
to
1443347
Compare
Problem
Currently for spark application we don't send any information about used sql query, for example
spark.sql(
"SELECT id FROM table"
).save......
will track that we used table and saved that somewhere but the raw sql won't be recorded in any event
"SELECT id FROM table"
structure.
Closes: #ISSUE-NUMBER
Solution
We can look into QuerExecution children up we find the sqlQuery, the downside of this using bfs algorithm is we keep only one query. So in case like
spark.sql("SELECT 1 as id").createOrReplaceTempView("tab1")
spark.sql("SELECT 2 as id").createOrReplaceTempView("tab2")
spark.sql("select * from tab1 union all select * from tab2")
we store only query select * from tab1 union all select * from tab2
that's the limitation of sql query facet where we can store only one value.
Field which we are looking for is available only for spark 3.3 and above
One-line summary:
Traversing QueryExecution to get sql field with bfs algorithm.
Checklist
SPDX-License-Identifier: Apache-2.0
Copyright 2018-2024 contributors to the OpenLineage project