-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add csv-core based reader (#3338) #3365
Conversation
@@ -45,6 +45,7 @@ arrow-data = { version = "29.0.0", path = "../arrow-data" } | |||
arrow-schema = { version = "29.0.0", path = "../arrow-schema" } | |||
chrono = { version = "0.4.23", default-features = false, features = ["clock"] } | |||
csv = { version = "1.1", default-features = false } | |||
csv-core = { version = "0.1"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already a dependency of csv, and has no default features
fn infer_reader_schema_with_csv_options<R: Read>( | ||
reader: R, | ||
roptions: ReaderOptions, | ||
) -> Result<(Schema, usize), ArrowError> { | ||
let mut csv_reader = Reader::build_csv_reader( | ||
let mut csv_reader = build_csv_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema inference still uses the old reader, both to reduce the size of this PR, but also because it is inherently row oriented, as opposed to parsing.
@@ -383,6 +417,7 @@ impl<R: Read> Reader<R> { | |||
/// This constructor allows you more flexibility in what records are processed by the | |||
/// csv reader. | |||
#[allow(clippy::too_many_arguments)] | |||
#[deprecated(note = "Use Reader::new or ReaderBuilder")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having two methods from_reader
and new
with identical signatures is a touch confusing. Let's just unify them
schema, | ||
projection, | ||
reader: csv_reader, | ||
line_number: if has_header { start + 1 } else { start }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the cause of #3364, it increments the start but not the end
@@ -2074,4 +2032,31 @@ mod tests { | |||
let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap(); | |||
assert_eq!(col1_arr.value(5), "value5"); | |||
} | |||
|
|||
#[test] | |||
fn test_header_bounds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for #3364
@@ -177,11 +179,36 @@ pub fn infer_reader_schema<R: Read>( | |||
infer_reader_schema_with_csv_options(reader, roptions) | |||
} | |||
|
|||
/// Creates a `csv::Reader` | |||
fn build_csv_reader<R: Read>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is just moved to be closer to where it is used. A subsequent PR might look to move the schema inference logic into its own file
col_idx: usize, | ||
precision: u8, | ||
scale: i8, | ||
) -> Result<ArrayRef, ArrowError> { | ||
let mut decimal_builder = Decimal128Builder::with_capacity(rows.len()); | ||
for row in rows { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now more strictly enforce that the schema actually matches the data read, I don't think it was a documented behaviour that rows could be missing fields, and I think largely an implementation quirk, but I think it is worth highlighting
|
||
/// Skips forward `to_skip` rows | ||
pub fn skip(&mut self, mut to_skip: usize) -> Result<(), ArrowError> { | ||
// TODO: This could be done by scanning for unquoted newline delimiters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously the implementation read to a ByteRecord
, this will perform similarly or better, and so I don't think this is an issue
The performance gain is significantly better than I expected, to the point where I wonder if I've messed something up 😅 In particular the timings not scaling with batch size seems somewhat suspect to me... |
Timings make sense to me - for a single batch performance difference will be pretty large (as in the benchmark), but for a full csv file with many batches the difference probably is smaller as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great improvement 👌
Benchmark runs are scheduled for baseline = a8c9685 and contender = c344433. c344433 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
col_idx: usize, | ||
) -> Result<ArrayRef, ArrowError> { | ||
rows.iter() | ||
.enumerate() | ||
.map(|(row_index, row)| { | ||
match row.get(col_idx) { | ||
Some(s) => { | ||
if s.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dropped the handling of null values for boolean arrays. Specifically, it removed the block below (which was previously present for primitives and booleans). The affect of this is that when parsing a CSV file containing a null value, it raises a parse error.
if s.is_empty() {
return Ok(None);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops will fix before the next release
ReadRecordResult::InputEmpty => break 'input, // Input exhausted, need to read more | ||
ReadRecordResult::OutputFull => break, // Need to allocate more capacity | ||
ReadRecordResult::OutputEndsFull => { | ||
return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got more than {}", self.num_columns, field_count))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like something wrong with this condition.
I will investigate & create issue with MRE as soon as I will reproduce it on stable basis.
But just wanted to mention, I'm using datafusion & after updating to new CSV reader I started to encounter such errors:
incorrect number of fields, expected 7 got more than 7
(numbers always the same actually)
And it happens for cases which worked well before this update @tustvold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be expected, see apache/datafusion#4918
The TLDR is this may have been working by accident, and has never worked reliably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold wow. but CSV files are correct which produce this error (and schema passed directly without inference) - it has exactly number of fields as expected (however, let me check for null values inside).
Shouldn't the error message be changed in that case? I can't clearly understand the case when it happens, but trying
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It happens when at least one row contains more fields than specified in the schema, i.e. more than schema.fields().len() - 1
delimiters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold I'm totally sure that number of rows were equal or less(I mean null values) than number of fields in schema
I'm double-checking that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When one of the rows contains more fields than specified in schema it's usually:
incorrect number of fields, expected X got Y
(from ReadRecordResult::Record
arm of match), but no this variant of error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you update to the latest DataFusion, which includes arrow 31, it will print a line number which may help identify what is going on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold yes, I did & it prints. But the line in files are correct, it always refer to first one in my case. 🤔 As I said, I'll try to create a MRE, otherwise it's hard to explain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that sounds like apache/datafusion#4918 and the error is occurring when it tries to skip the header
Which issue does this PR close?
Part of #3338
Closes #3364
Rationale for this change
Yields anything from 25% to 75% faster, with larger improvements with larger batch sizes.
What changes are included in this PR?
Adds a custom record reader based on csv-core, that significantly reduces allocations whilst parsing arrow data.
Are there any user-facing changes?
Previously if provided with a schema that was a valid prefix of the columns, it wouldn't complain. It now will. This behaviour was undocumented, and I think an accident, but is technically a user-facing change