Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17993][SQL] Fix Parquet log output redirection #15538

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.Serializable;
import java.util.logging.Handler;
import java.util.logging.Logger;

import org.apache.parquet.Log;
import org.slf4j.bridge.SLF4JBridgeHandler;

// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
final class ParquetLogRedirector implements Serializable {
// Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
// especially important for Serializable classes where fields are set but constructors are
// ignored
static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();

// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
// references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
private static final Logger apacheParquetLogger =
Logger.getLogger(Log.class.getPackage().getName());
private static final Logger parquetLogger = Logger.getLogger("parquet");

static {
// For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
try {
Class.forName(Log.class.getName());
redirect(Logger.getLogger(Log.class.getPackage().getName()));
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}

// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
// namespace.
try {
Class.forName("parquet.Log");
redirect(Logger.getLogger("parquet"));
} catch (Throwable t) {
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
// should be removed after this issue is fixed.
}
}

private ParquetLogRedirector() {
}

private static void redirect(Logger logger) {
for (Handler handler : logger.getHandlers()) {
logger.removeHandler(handler);
}
logger.setUseParentHandlers(false);
logger.addHandler(new SLF4JBridgeHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.net.URI
import java.util.logging.{Logger => JLogger}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -29,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
import org.slf4j.bridge.SLF4JBridgeHandler

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
Expand All @@ -56,6 +53,11 @@ class ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
// Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
// ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
// is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
// here.
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE

override def shortName(): String = "parquet"

Expand Down Expand Up @@ -129,10 +131,14 @@ class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}

ParquetFileFormat.redirectParquetLogs()

new OutputWriterFactory {
override def newInstance(
// This OutputWriterFactory instance is deserialized when writing Parquet files on the
// executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
// another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
// initialized.
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE

override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
Expand Down Expand Up @@ -673,44 +679,4 @@ object ParquetFileFormat extends Logging {
Failure(cause)
}.toOption
}

// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
// references to loggers in both parquet-mr <= 1.6 and >= 1.7
val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
val parquetLogger: JLogger = JLogger.getLogger("parquet")

// Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
// we redirect the JUL logger via SLF4J JUL bridge handler.
val redirectParquetLogsViaSLF4J: Unit = {
def redirect(logger: JLogger): Unit = {
logger.getHandlers.foreach(logger.removeHandler)
logger.setUseParentHandlers(false)
logger.addHandler(new SLF4JBridgeHandler)
}

// For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
// scalastyle:off classforname
Class.forName(classOf[ApacheParquetLog].getName)
// scalastyle:on classforname
redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))

// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
// namespace.
try {
// scalastyle:off classforname
Class.forName("parquet.Log")
// scalastyle:on classforname
redirect(JLogger.getLogger("parquet"))
} catch { case _: Throwable =>
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
// should be removed after this issue is fixed.
}
}

/**
* ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
*/
def redirectParquetLogs(): Unit = {}
}
4 changes: 2 additions & 2 deletions sql/core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF

# Parquet related logging
log4j.logger.org.apache.parquet.hadoop=WARN
log4j.logger.org.apache.spark.sql.parquet=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
4 changes: 4 additions & 0 deletions sql/hive/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF

log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR

# Parquet related logging
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR