Skip to content

Commit

Permalink
fix(ingest): spark-lineage - Adding additional debug logs to spark li…
Browse files Browse the repository at this point in the history
…neage (#5772)
  • Loading branch information
treff7es authored Sep 5, 2022
1 parent c44fd62 commit caec2ed
Showing 1 changed file with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.SparkDataset;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.util.JsonProtocol;
import org.json4s.jackson.JsonMethods$;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;


@Slf4j
public class DatahubSparkListener extends SparkListener {

Expand All @@ -58,7 +61,7 @@ public class DatahubSparkListener extends SparkListener {
public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance";

public static final String COALESCE_KEY = "coalesce_jobs";

private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
Expand All @@ -78,12 +81,33 @@ public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, S
this.sqlStart = sqlStart;
this.plan = plan;
this.ctx = ctx;

String jsonPlan = (plan != null) ? plan.toJSON() : null;
String sqlStartJson =
(sqlStart != null) ? JsonMethods$.MODULE$.compact(JsonProtocol.sparkEventToJson(sqlStart)) : null;
log.debug("SqlStartTask with parameters: sqlStart: {}, plan: {}, ctx: {}", sqlStartJson, jsonPlan, ctx);
}

public void run() {
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
if (ctx == null) {
log.error("Context is null skipping run");
return;
}

if (ctx.conf() == null) {
log.error("Context does not have config. Skipping run");
return;
}

if (sqlStart == null) {
log.error("sqlStart is null skipping run");
return;
}

appSqlDetails.get(ctx.applicationId())
.put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());

Expand All @@ -94,8 +118,8 @@ public void run() {
return;
}
// Here assumption is that there will be only single target for single sql query
DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(),
outputDS.get().iterator().next());
DatasetLineage lineage =
new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get().iterator().next());
Collection<QueryPlan<?>> allInners = new ArrayList<>();

plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {
Expand Down Expand Up @@ -140,8 +164,9 @@ public boolean isDefinedAt(LogicalPlan x) {
});
}

SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx),
ctx.applicationId(), sqlStart.time(), sqlStart.executionId(), lineage);
SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);

appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt);

Expand Down Expand Up @@ -257,11 +282,13 @@ public void processExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
public Void apply(SparkContext sc) {
SQLQueryExecStartEvent start = appSqlDetails.get(sc.applicationId()).remove(sqlEnd.executionId());
if (start == null) {
log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId()
+ ":" + sqlEnd.executionId());
log.error(
"Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":"
+ sqlEnd.executionId());
} else if (start.getDatasetLineage() != null) {
SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(),
sc.applicationId(), sqlEnd.time(), sqlEnd.executionId(), start);
SQLQueryExecEndEvent evt =
new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(),
sqlEnd.executionId(), start);
McpEmitter emitter = appEmitters.get(sc.applicationId());
if (emitter != null) {
emitter.accept(evt);
Expand All @@ -271,7 +298,7 @@ public Void apply(SparkContext sc) {
}
});
}

private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {
ExecutorService pool = null;
String appId = ctx.applicationId();
Expand All @@ -281,18 +308,17 @@ private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {
appConfig.put(appId, datahubConf);
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY)
: com.typesafe.config.ConfigFactory.empty();
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(),
ctx.sparkUser(), pipelineConfig);
AppStartEvent evt =
new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), ctx.sparkUser(),
pipelineConfig);

appEmitters.computeIfAbsent(appId,
s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY)
? new CoalesceJobsEmitter(datahubConf)
: new McpEmitter(datahubConf))
.accept(evt);
s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY) ? new CoalesceJobsEmitter(
datahubConf) : new McpEmitter(datahubConf)).accept(evt);
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
appSqlDetails.put(appId, new ConcurrentHashMap<>());
}
}
}

private String getPipelineName(SparkContext cx) {
Expand Down Expand Up @@ -329,10 +355,11 @@ private List<LineageConsumer> consumers() {
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
.map(x -> LineageUtils.getConsumer(x))
.filter(Objects::nonNull)
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}

}
}

0 comments on commit caec2ed

Please sign in to comment.