Skip to content

Commit

Permalink
[CARMEL-6439] Add configuration to enable log column lineage (#1238)
Browse files Browse the repository at this point in the history
* [CARMEL-6439] Add configuration to enable log column lineage

* remove log

* useless condition

* skip empty lineage
  • Loading branch information
fenzhu authored and GitHub Enterprise committed Feb 17, 2023
1 parent 5f91f87 commit 9f306b4
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.QueryExecution.skipAuthTag
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, EnsureRepartitionForWriting, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.{AdjustScanPartitionSizeDynamically, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EliminateShuffleExec, EnsureRequirements, ExchangePushDownThroughAggregate}
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.expressions.lineage.AttributeLineageUtils
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -237,6 +239,7 @@ class QueryExecution(
if (SQLConf.get.uiPlanWithMetrics) {
if (tracker.alreadyExecuted) {
append(tracker.getRealExecutionInfo)
return
}
append(stringWithStats)
if (tracker.hasView) {
Expand All @@ -245,6 +248,36 @@ class QueryExecution(
if (tracker.hasTempView) {
append(tracker.formattedTempViewUsage())
}
if (sparkSession.sessionState.conf.getConfString(
"spark.sql.logColumnLineage.enable", "false").equalsIgnoreCase("true")) {
if (optimizedPlan.isInstanceOf[DataWritingCommand] ||
optimizedPlan.isInstanceOf[CreateTableAsSelect]) {
append("\n=== Column Lineage Start ===\n")
val startTime = System.currentTimeMillis()
var columns = 0
try {
val (outputColumnNames, query) = optimizedPlan match {
case dw: DataWritingCommand => (dw.outputColumnNames, dw.query)
case ctas: CreateTableAsSelect => (ctas.query.output.map(_.name), ctas.query)
}
columns = outputColumnNames.length
outputColumnNames.zipWithIndex.foreach { cidx =>
val lineages = AttributeLineageUtils.getAttributeOrigins(
query, cidx._2, keepOperations = false)
if (lineages.nonEmpty) {
append(cidx._1 + ": " + lineages.map(_.getColumn).mkString("|") + "\n")
}
}
} catch {
case _: Exception =>
append("Fail to get the column lineage information\n")
}
val cost = System.currentTimeMillis() - startTime
append(s"Total table columns: $columns columns\n")
append(s"Lineage time cost: $cost ms\n")
append("=== Column Lineage End ===\n")
}
}
append(tracker.formattedRulesByTime())
} else {
val (verbose, addSuffix) = (true, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ case class AttributeOrigin(
s"keepOrigin=${!isDerived}, affectedOperations=${affectedOperations.length})"
}

def getColumn: String = {
val tbl = leafNode match {
case l: LogicalRelation if l.catalogTable.isDefined =>
l.catalogTable.get.identifier.unquotedString
case f: FileSourceScanExec if f.tableIdentifier.isDefined =>
f.tableIdentifier.get.unquotedString
case _ => leafNode.nodeName
}
s"($tbl, ${leafNode.output(index).name})"
}

def isOriginalTableColumn: Boolean = !isDerived

def affectedOperations: Seq[Expression] = {
Expand Down

0 comments on commit 9f306b4

Please sign in to comment.