Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized RecordBatch for constant columns #1248

Closed
alamb opened this issue Nov 5, 2021 · 11 comments
Closed

Optimized RecordBatch for constant columns #1248

alamb opened this issue Nov 5, 2021 · 11 comments
Labels
datafusion Changes in the datafusion crate enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Nov 5, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Certain record batches have columns that entirely the same value -- for example the partitioning column in #1141

In that case the ListingProvider adds a column with a constant value based on the path in the file system

So a file in a path like /foo/date=2021-11-5/bar.parquet would end up producing one or more RecordBatches with a date column that had the value 2021-11-5.

At the moment, to represent such a column using an Arrow array we have to repeat the value 2021-11-5 over and over again which is inefficient both in space storage as well as in processing time.

date other_col
2021-11-5 1
2021-11-5 2
2021-11-5 3
2021-11-5 4
... ...
2021-11-5 1000000000

Describe the solution you'd like

It seems that having a sort of RecordBatch that can contain constant columns represented by a scalar value (like the C++ ExecBatch) is a pretty nice abstraction that will help adding tons of optimizations. I didn't find an issue summarizing this discussion and conclusion, should we create one?

According to @westonpace , in C++ there is ExecBatch in addition to RecordBatch for this purpose. ExecBatch is used within the exec plan and it's pretty much identical to record batch except:

  • No schema
  • No field names
  • Column values can be scalars

Note DataFusion already has a Array/Scalar notion in ColumnarValue https://github.com/apache/arrow-datafusion/blob/a1c794cec233f7fe34f34c7d64f529625a507669/datafusion/src/physical_plan/mod.rs#L388-L395 but its use is confined to expression evaluation at the moment.

Perhaps we could build something like DFRecordBatch akin to DFSchema (aka wrap the arrow RecordBatch with methods that allow the columns to be ColumnarValue)

Describe alternatives you've considered
We could use a Dictionary(Int8, Utf8) array to do better (where each repeated string value only requires a single i8 (8 bytes) but it still requires ~ 8 bytes * the number of total rows in the RecordBatch

Another classic approach I have seen for this kind of problem is to use RLE encoding so that these “almost entirely constant columns” become a very small number of RLE runs.

I think RLE has been proposed for Arrow several times before, but it doesn’t have the “access array[i]” in constant time” property required for arrays

Additional context
(this is my transcription of some information in an ASF slack thread: https://the-asf.slack.com/archives/C01QUFS30TD/p1634810772007000)

cc @houqp @rdettai @Dandandan

@alamb alamb added enhancement New feature or request datafusion Changes in the datafusion crate labels Nov 5, 2021
@jimexist
Copy link
Member

jimexist commented Nov 6, 2021

Perhaps we could build something like DFRecordBatch akin to DFSchema (aka wrap the arrow RecordBatch with methods that allow the columns to be ColumnarValue)

I'm for this approach.

In fact we can allow several other types that are memory efficient:

  1. sequence with increase or decrease direction and step size
  2. repeated array that's n copies of underlying data
  3. zoomed out sample of underlying data
  4. etc.

basically DFRecordBatch with lazy or const evaluated operations that owns the underlying data (can be a much shorter one)

@alamb
Copy link
Contributor Author

alamb commented Nov 6, 2021

Another thing to contemplate is support for RLE (aka store runs of repeated values)

A
A
A
B
B
B
B

-->

(A, 3)
(B, 4)

@jorgecarleitao
Copy link
Member

Tangentially related, since I have been investigating something related: the RecordBatch in arrow-rs is not defined in the arrow specification. Specifically, the term "RecordBatch" comes from the IPC specification RecordBatch message, but such a message does hold a schema, since the schema is written on a separate Schema message at the beginning of the file. The C data interface has no such concept at all. So, I do not think it is an issue to use something else that fits our needs better.

I think that the two requirements for something to be consistent with the arrow format are:

  • data and schema round-trips lossless over Arrow's IPC boundary
  • data and schema round-trips lossless over Arrow's C data interface boundary at zero cost

I do not think RLE and other encodings fulfill these requirements (not sure whether this is an issue, though).

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2021

👍 I agree and I think keeping a separation between what is generally useful for users of Arrow libraries and what is more likely to be more narrowly useful to DataFusion is important. For example, I think it would be fine to add specialized constant column or RLE support in DataFusion, but that likely doesn't belong in an arrow implementation

@jimexist
Copy link
Member

jimexist commented Nov 7, 2021

maybe something like this just to see how much change is needed

@jimexist
Copy link
Member

jimexist commented Nov 8, 2021

I had a further thought on this and believe having DFRecordBatch is approachable, with some intermediate steps:

  • create a trait DFRecordBatch, with basic functions defined such as fn is_emtpy(&self) -> bool, fn columns(&self) -> Vec<ArrayRef>, and fn column(&self, i: usize) -> ArrayRef defined, and also fn to_arrow(&self) -> RecordBatch
  • have a default DFRecordBatchImpl struct that wraps around arrow's RecordBatch and impl DFRecordBatch,
  • have fn to_df(&self) -> DFRecordBatch implemented for arrow's RecordBatch
  • change DataFrame to return DFRecordBatch instead and let this breaking change released to downstream
  • add other types of impls for DFRecordBatch or enhance DFRecordBatchImpl to have virtual columns

@houqp
Copy link
Member

houqp commented Nov 10, 2021

columns(&self) -> Vec, and fn column(&self, i: usize) -> ArrayRef

I think instead of returning ArrayRef, we should be returning ColumnarValue here?

RLE can be something we use to further compress arrow record batches, but I think it is an overkill for constant columns because this use-case can already be covered by ColumnarValue::Scalar. I can see us adding it in the future for other use-cases though, perhaps as another variant to ColumnarValue?. ColumnarValue::Scalar is effectively an special case for RLE, which maps nicely to arrow compute kernels that are specialized to scalar operands.

@jimexist
Copy link
Member

jimexist commented Nov 10, 2021

perhaps as another variant to ColumnarValue?

Sequence is one example

@viirya
Copy link
Member

viirya commented Nov 24, 2021

Recently I've tried to make a stab at this. As RecordBatch is widely used across the codebase, replacing them with DFRecordBatch makes a lot of changes. I'm wondering if this is expected or there is a way to limit the scope of change but I'm not aware of?

@alamb
Copy link
Contributor Author

alamb commented Nov 24, 2021

I would indeed expect it to be a substantial change across the codebase. I don't have any magic suggestions to reduce the potential impact

@alamb
Copy link
Contributor Author

alamb commented Nov 21, 2024

I think we have ColumnarValue and it seems to work well -- I am not sure this idea is worth pursuing anymore so closing for now

@alamb alamb closed this as completed Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants