Skip to content

Commit

Permalink
[SPARK-34232][CORE][3.0] Redact SparkListenerEnvironmentUpdate event …
Browse files Browse the repository at this point in the history
…in log

### What changes were proposed in this pull request?

Redact event SparkListenerEnvironmentUpdate in log when its processing time exceeded logSlowEventThreshold.

This is the backport of apache#31335 to branch-3.0.

### Why are the changes needed?

Credentials could be exposed when its processing time exceeded logSlowEventThreshold.

As this is related to security issue, it is better to backport it.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually tested in original PR.

Closes apache#31634 from viirya/SPARK-34232-3.0.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
viirya authored and rshkv committed Mar 9, 2021
1 parent 6d71135 commit e46dbe7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[spark] class EventLoggingListener(
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
logEvent(redactEvent(event))
logEvent(redactEvent(sparkConf, event))
}

// Events that trigger a flush
Expand Down Expand Up @@ -265,8 +265,15 @@ private[spark] class EventLoggingListener(
}
redactedProperties
}
}

private[spark] object EventLoggingListener extends Logging {
val DEFAULT_LOG_DIR = "/tmp/spark-events"
// Dummy stage key used by driver in executor metrics updates
val DRIVER_STAGE_KEY = (-1, -1)

private[spark] def redactEvent(
sparkConf: SparkConf,
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
// Similar to:
Expand All @@ -282,11 +289,4 @@ private[spark] class EventLoggingListener(
}
SparkListenerEnvironmentUpdate(redactedProps)
}

}

private[spark] object EventLoggingListener extends Logging {
val DEFAULT_LOG_DIR = "/tmp/spark-events"
// Dummy stage key used by driver in executor metrics updates
val DRIVER_STAGE_KEY = (-1, -1)
}
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.codahale.metrics.Timer

import org.apache.spark.SparkEnv
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate

/**
* An event bus which posts events to its listeners.
Expand Down Expand Up @@ -128,7 +130,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logInfo(s"Process of event ${event} by listener ${listenerName} took " +
logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
s"${elapsed / 1000000000d}s.")
}
}
Expand All @@ -150,4 +152,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}

private def redactEvent(e: E): E = {
e match {
case event: SparkListenerEnvironmentUpdate =>
EventLoggingListener.redactEvent(env.conf, event).asInstanceOf[E]
case _ => e
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val conf = getLoggingConf(testDirPath, None)
.set(key, secretPassword)
val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val envDetails = SparkEnv.environmentDetails(conf, hadoopconf, "FIFO", Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
val redactedProps = EventLoggingListener
.redactEvent(conf, event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")
}

Expand Down

0 comments on commit e46dbe7

Please sign in to comment.