-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24959][SQL] Speed up count() for JSON and CSV #21909
Changes from all commits
bc4ce26
91250d2
bdc5ea5
d40f9bb
abd8572
359c4fc
168eb99
05c8dbb
6248c01
0e245a7
4a8a2eb
3f8fc5e
c40dc3d
da16234
900bd0e
f5f13fa
12d50d0
2f74059
6b98f3e
2998363
6b34018
3240405
2d8e754
96a94cc
50a0ef0
050c8ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,14 +21,16 @@ import org.apache.spark.SparkException | |
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow | ||
import org.apache.spark.sql.catalyst.util._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
class FailureSafeParser[IN]( | ||
rawParser: IN => Seq[InternalRow], | ||
mode: ParseMode, | ||
schema: StructType, | ||
columnNameOfCorruptRecord: String) { | ||
columnNameOfCorruptRecord: String, | ||
isMultiLine: Boolean) { | ||
|
||
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) | ||
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) | ||
|
@@ -56,9 +58,15 @@ class FailureSafeParser[IN]( | |
} | ||
} | ||
|
||
private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a big deal but I would leave a comment to explain why it's permissive and non-miltiline only. I assume counts are known when it's actually parsed for multiline cases, and counts should be given in any case when the mode is permissive, right? |
||
|
||
def parse(input: IN): Iterator[InternalRow] = { | ||
try { | ||
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) | ||
if (skipParsing) { | ||
Iterator.single(InternalRow.empty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not the same. If you return empty iterator, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ohh yes my bad! |
||
} else { | ||
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are broken records the parser can't parse, this skipping won't detect them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. To detect them with 100% guarantee, the parser must fully parse such records and column values must be casted according to types in data schema. We actually don't do that due to the column pruning mechanisms in both datasources - CSV and JSON. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a test case for counting both CSV and JSON source when the files having broken records? Any behavior change after this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Syntactically broken or semantically (wrong types for example)?
We have many tests in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both? If we introduce a behavior change, we need to document it in the migration guide and add a conf. Users can do the conf to revert back to the previous behaviors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the tests |
||
} | ||
} catch { | ||
case e: BadRecordException => mode match { | ||
case PermissiveMode => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -203,19 +203,11 @@ class UnivocityParser( | |
} | ||
} | ||
|
||
private val doParse = if (requiredSchema.nonEmpty) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are the changes here https://github.com/apache/spark/pull/21909/files#diff-3a4dc120191f7052e5d98db11934bfb5R63 replacing the need for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The introduced optimization works in the case if |
||
(input: String) => convert(tokenizer.parseLine(input)) | ||
} else { | ||
// If `columnPruning` enabled and partition attributes scanned only, | ||
// `schema` gets empty. | ||
(_: String) => InternalRow.empty | ||
} | ||
|
||
/** | ||
* Parses a single CSV string and turns it into either one resulting row or no row (if the | ||
* the record is malformed). | ||
*/ | ||
def parse(input: String): InternalRow = doParse(input) | ||
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) | ||
|
||
private val getToken = if (options.columnPruning) { | ||
(tokens: Array[String], index: Int) => tokens(index) | ||
|
@@ -293,7 +285,8 @@ private[csv] object UnivocityParser { | |
input => Seq(parser.convert(input)), | ||
parser.options.parseMode, | ||
schema, | ||
parser.options.columnNameOfCorruptRecord) | ||
parser.options.columnNameOfCorruptRecord, | ||
parser.options.multiLine) | ||
convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => | ||
safeParser.parse(tokens) | ||
}.flatten | ||
|
@@ -341,7 +334,8 @@ private[csv] object UnivocityParser { | |
input => Seq(parser.parse(input)), | ||
parser.options.parseMode, | ||
schema, | ||
parser.options.columnNameOfCorruptRecord) | ||
parser.options.columnNameOfCorruptRecord, | ||
parser.options.multiLine) | ||
filteredLines.flatMap(safeParser.parse) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,8 +119,47 @@ object CSVBenchmarks { | |
} | ||
} | ||
|
||
def countBenchmark(rowsNum: Int): Unit = { | ||
val colsNum = 10 | ||
val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum) | ||
|
||
withTempPath { path => | ||
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) | ||
val schema = StructType(fields) | ||
|
||
spark.range(rowsNum) | ||
.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) | ||
.write | ||
.csv(path.getAbsolutePath) | ||
|
||
val ds = spark.read.schema(schema).csv(path.getAbsolutePath) | ||
|
||
benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => | ||
ds.select("*").filter((_: Row) => true).count() | ||
} | ||
benchmark.addCase(s"Select 1 column + count()", 3) { _ => | ||
ds.select($"col1").filter((_: Row) => true).count() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this benchmark result vary if we select |
||
} | ||
benchmark.addCase(s"count()", 3) { _ => | ||
ds.count() | ||
} | ||
|
||
/* | ||
Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz | ||
|
||
Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
--------------------------------------------------------------------------------------------- | ||
Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X | ||
Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X | ||
count() 2332 / 2386 4.3 233.2 5.4X | ||
*/ | ||
benchmark.run() | ||
} | ||
} | ||
|
||
def main(args: Array[String]): Unit = { | ||
quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) | ||
multiColumnsBenchmark(rowsNum = 1000 * 1000) | ||
countBenchmark(10 * 1000 * 1000) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related, @MaxGekk? Let's don't add unrelated changes next time.