From 73afb2bfba7fa0218d2d8ab85e85ff60ec61a601 Mon Sep 17 00:00:00 2001 From: Warren Zhu Date: Tue, 26 Jan 2021 17:10:53 +0900 Subject: [PATCH] [SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log ### What changes were proposed in this pull request? Redact event SparkListenerEnvironmentUpdate in log when its processing time exceeded logSlowEventThreshold ### Why are the changes needed? Credentials could be exposed when its processing time exceeded logSlowEventThreshold ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested Closes #31335 from warrenzhu25/34232. Authored-by: Warren Zhu Signed-off-by: HyukjinKwon --- .../spark/scheduler/EventLoggingListener.scala | 16 ++++++++-------- .../org/apache/spark/util/ListenerBus.scala | 12 +++++++++++- .../scheduler/EventLoggingListenerSuite.scala | 4 ++-- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index d4e22d739098f..c57894b9f4f8f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -130,7 +130,7 @@ private[spark] class EventLoggingListener( } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { - logEvent(redactEvent(event)) + logEvent(redactEvent(sparkConf, event)) } // Events that trigger a flush @@ -295,8 +295,15 @@ private[spark] class EventLoggingListener( } redactedProperties } +} + +private[spark] object EventLoggingListener extends Logging { + val DEFAULT_LOG_DIR = "/tmp/spark-events" + // Dummy stage key used by driver in executor metrics updates + val DRIVER_STAGE_KEY = (-1, -1) private[spark] def redactEvent( + sparkConf: SparkConf, event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = { // environmentDetails maps a string descriptor to a set of properties // Similar to: @@ -312,11 +319,4 @@ private[spark] class EventLoggingListener( } SparkListenerEnvironmentUpdate(redactedProps) } - -} - -private[spark] object EventLoggingListener extends Logging { - val DEFAULT_LOG_DIR = "/tmp/spark-events" - // Dummy stage key used by driver in executor metrics updates - val DRIVER_STAGE_KEY = (-1, -1) } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 51cd7d1284ff3..3520fa870c91b 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -27,6 +27,8 @@ import com.codahale.metrics.Timer import org.apache.spark.SparkEnv import org.apache.spark.internal.{config, Logging} +import org.apache.spark.scheduler.EventLoggingListener +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate /** * An event bus which posts events to its listeners. @@ -128,7 +130,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() if (logSlowEventEnabled && elapsed > logSlowEventThreshold) { - logInfo(s"Process of event ${event} by listener ${listenerName} took " + + logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " + s"${elapsed / 1000000000d}s.") } } @@ -150,4 +152,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq } + private def redactEvent(e: E): E = { + e match { + case event: SparkListenerEnvironmentUpdate => + EventLoggingListener.redactEvent(env.conf, event).asInstanceOf[E] + case _ => e + } + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 7acb8451e3b38..240774d854c92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -90,11 +90,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val conf = getLoggingConf(testDirPath, None) .set(key, secretPassword) val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) - val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) val envDetails = SparkEnv.environmentDetails( conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty) val event = SparkListenerEnvironmentUpdate(envDetails) - val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap + val redactedProps = EventLoggingListener + .redactEvent(conf, event).environmentDetails("Spark Properties").toMap assert(redactedProps(key) == "*********(redacted)") }