Skip to content

Commit

Permalink
[SPK-252] Spark log format (apache#173)
Browse files Browse the repository at this point in the history
* Revert & logging for Stratio Standard

* Revert changs
  • Loading branch information
pianista215 authored Mar 14, 2018
1 parent 9da8ad4 commit d83b7bc
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Possibility of specifying the name of the certificate within a directory with multiple certificates
* Add support to multiples CAs
* Added performance tests PNF
* Fix logs to be same as rest of Stratio platform
* Add messages for errors in Vault

## 2.2.0.5 (upcoming)
Expand Down
6 changes: 3 additions & 3 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ log4j.rootCategory=INFO, stdout, stderr
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p - - - %c{1} %m%n
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSSZ} %p - - - %c{1} {"@message":"%m"}%n
log4j.appender.stdout.filter.filter1.levelMin=TRACE
log4j.appender.stdout.filter.filter1.levelMax=INFO
log4j.appender.stdout.filter.filter1=org.apache.log4j.varia.LevelRangeFilter
Expand All @@ -31,7 +31,7 @@ log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.Threshold = WARN
log4j.appender.stderr.target=System.err
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %p - - - %c{1} %m%n
log4j.appender.stderr.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSSZ} %p - - - %c{1} {"@message":"%m"}%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
Expand All @@ -48,4 +48,4 @@ log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,7 @@ object ExternalShuffleService extends Logging {
private val barrier = new CountDownLatch(1)

def main(args: Array[String]): Unit = {
try {
main(args, (conf: SparkConf, sm: SecurityManager) => new ExternalShuffleService(conf, sm))
} catch {

// Notify using the Stratio standard format
case e: Exception =>
logError("Error initializing External Spark Shuffle service", e)
throw e

}
main(args, (conf: SparkConf, sm: SecurityManager) => new ExternalShuffleService(conf, sm))
}

/** A helper main method that allows the caller to call this with a custom shuffle service. */
Expand Down
12 changes: 2 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.scheduler.{KerberosUser, KerberosUtil}
import org.apache.spark.security.{ConfigSecurity, VaultHelper}
Expand All @@ -66,7 +65,7 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit extends CommandLineUtils with Logging{
object SparkSubmit extends CommandLineUtils{

// Cluster managers
private val YARN = 1
Expand Down Expand Up @@ -183,14 +182,7 @@ object SparkSubmit extends CommandLineUtils with Logging{
}
}
} else {
// Retrieve errors to log them into the Stratio solution
try {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
} catch {
case e: Exception =>
logError("Error from launched Application", e)
throw e
}
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,34 +280,25 @@ object HistoryServer extends Logging {
val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]): Unit = {
try {
Utils.initDaemon(log)

new HistoryServerArguments(conf, argStrings)
initSecurity()
val securityManager = createSecurityManager(conf)
Utils.initDaemon(log)

val providerName = conf.getOption("spark.history.provider")
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]
new HistoryServerArguments(conf, argStrings)
initSecurity()
val securityManager = createSecurityManager(conf)

val port = conf.getInt("spark.history.ui.port", 18080)
val providerName = conf.getOption("spark.history.provider")
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]

val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
val port = conf.getInt("spark.history.ui.port", 18080)

ShutdownHookManager.addShutdownHook { () => server.stop() }
} catch {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()

// History server using Standard Stratio log format
case e: Exception =>
logError("Error initializing History Server", e)
throw e

}
ShutdownHookManager.addShutdownHook { () => server.stop() }

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit()
}

try {
ConfigSecurity.prepareEnvironment
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
} catch {

// Notify using the Stratio standard format
case e: Exception =>
logError("Error initializing Spark executor", e)
throw e

}
ConfigSecurity.prepareEnvironment
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

System.exit(0)
}
Expand Down
28 changes: 11 additions & 17 deletions core/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,54 +48,48 @@ trait Logging {
}
log_
}

private def formatMsgInJson(msg: => String,
throwable: Option[Throwable] = None): String =
throwable.map{ th =>
raw"""{"@message": "$msg", "@data":"${ExceptionUtils.getFullStackTrace(th)}"}"""
} getOrElse
raw"""{"@message": "$msg"}"""


// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(formatMsgInJson(msg))
if (log.isInfoEnabled) log.info(msg)
}

protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(formatMsgInJson(msg))
if (log.isDebugEnabled) log.debug(msg)
}

protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(formatMsgInJson(msg))
if (log.isTraceEnabled) log.trace(msg)
}

protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(formatMsgInJson(msg))
if (log.isWarnEnabled) log.warn(msg)
}

protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(formatMsgInJson(msg))
if (log.isErrorEnabled) log.error(msg)
}

// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(formatMsgInJson(msg, Some(throwable)))
if (log.isInfoEnabled) log.info(msg, throwable)
}

protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(formatMsgInJson(msg, Some(throwable)))
if (log.isDebugEnabled) log.debug(msg, throwable)
}

protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(formatMsgInJson(msg, Some(throwable)))
if (log.isTraceEnabled) log.trace(msg, throwable)
}

protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(formatMsgInJson(msg, Some(throwable)))
if (log.isWarnEnabled) log.warn(msg, throwable)
}

protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(formatMsgInJson(msg, Some(throwable)))
if (log.isErrorEnabled) log.error(msg, throwable)
}

protected def isTraceEnabled(): Boolean = {
Expand Down
19 changes: 5 additions & 14 deletions core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
val loggingEvent = loggingEventCaptor.getValue

assert(hasExceptions(loggingEvent) == true)
assert(containsException(loggingEvent, classOf[IOException]))
assert(loggingEvent.getThrowableInformation !== null)
assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])

} finally {
logger.setLevel(oldLogLevel)
}
Expand Down Expand Up @@ -298,9 +299,8 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
import scala.collection.JavaConverters._
loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
assert(
hasExceptions(loggingEvent)
|| !containsException(loggingEvent, classOf[IOException]))
assert(loggingEvent.getThrowableInformation === null
|| !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
}
} finally {
logger.setLevel(oldLogLevel)
Expand Down Expand Up @@ -362,15 +362,6 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
}.foreach { _.delete() }
}

def hasExceptions(loggingEvent: LoggingEvent): Boolean =
loggingEvent.getRenderedMessage().contains("@data")

def containsException(loggingEvent: LoggingEvent, e: Class[_]): Boolean = {
val msg = loggingEvent.getRenderedMessage
val dataPart = msg.substring(msg.indexOf("@data"))
dataPart.contains(e.getCanonicalName)
}

/** Used to synchronize when read is called on a stream */
private trait LatchedInputStream extends PipedInputStream {
val latchReadStarted = new CountDownLatch(1)
Expand Down

0 comments on commit d83b7bc

Please sign in to comment.