Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Default Parquet Write Compression #7692

Merged
merged 3 commits into from
Sep 30, 2023

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

Closes #7691

Rationale for this change

See issue for discussion

What changes are included in this PR?

Set default parquet writer to use zstd level 3 compression.

Are these changes tested?

By existing tests

Are there any user-facing changes?

Much smaller parquet files for a minor write performance penalty.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Sep 29, 2023
@tustvold
Copy link
Contributor

tustvold commented Sep 29, 2023

I'm not sure about this, block compression is far from free, and many workloads particularly those using object storage would be willing to trade faster read performance for slightly larger objects.

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Sep 29, 2023

@tustvold Going from uncompressed to even zstd(1) is likely to bring a 50% reduction in file size (up to 80% is not uncommon depending on the data). I have some of my own DataFusion specific benchmarks, but Uber Engineering has a nice public analysis on this topic here.

I added some benchmarks showing with/without compression here. Zstd(1)-Zstd(3) write speeds in the single threaded AsyncArrowWriter is 15-30% slower for a 50-60% reduction in file size. The parallel writer is only about 5% slower for the same reduction in file size. I have not compared read speeds on the uncompressed vs compressed files, though if you are I/O limited to a remote ObjectStore it is possible for compression to improve read performance.

Another argument in favor of this change is that most other popular frameworks with parquet write support default to compression of either snappy (compatibility) or zstd (best performance), so users of DataFusion imo will not expect the default to be uncompressed.

The Datafusion default is not all that important for systems/database developers, since those users almost certainly will tune the settings to their use case anyway. This is more important for efforts to gain adoption of direct users using Datafusion for analysis/ETL workloads, such as via the datafusion-cli or python bindings.

@tustvold
Copy link
Contributor

Whats the performance hit like for read? A write speed reduction is not a huge deal to me, it is read performance that matters as this has will have a direct impact on query latency.

FWIW LZ4_RAW is probably the "best" codec, but ecosystem support is limited...

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Sep 29, 2023

I ran a quick test using Query1, reading the uncompressed parquet file vs. zstd compressed. This is using local SSD based storage.

  • Uncompressed: 1.8393s
  • Zstd: 1.8297s

The performance is nearly identical. I averaged each over 50 runs for the above numbers and they are converging towards <1% performance difference. Variance run-to-run is ~5% so on a single run either one can be faster.

Script:

import time
from datafusion import SessionContext

t = time.time()

#uncompressed file, ~3.6Gb on disk
#file = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet"

#zstd compressed file, ~1.6Gb on disk
file = "/home/dev/arrow-datafusion/test_out/benchon.parquet"

# Create a DataFusion context
ctx = SessionContext()

# Register table with context
ctx.register_parquet('test', file)

times = []
for i in range(50):
  t = time.time()
    query = """
    select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        test
    where
            l_shipdate <= date '1998-09-02'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus
    """

    # Execute SQL
    df = ctx.sql(f"{query}")
    df.show()
    elapsed = time.time() - t
    times.append(elapsed)
    print(f"datafusion agg query {elapsed}s")

print(sum(times)/len(times))

@tustvold
Copy link
Contributor

tustvold commented Sep 29, 2023

The performance is nearly identical

Can you run a test that is actually bottlenecked on parquet, e.g. a predicated scan, as opposed to something with sorts and groups by in it that will dominate pretty much anything else other than joins.

@devinjdangelo
Copy link
Contributor Author

I was also a bit suspicious of how identical the performance was. I caught a mistake in my set up, both numbers above were for ZSTD not uncompressed. I corrected the mistake and ran two more tests below. Indeed, using local storage uncompressed reads are a good bit faster. I would be interested to compare this to remote object storage where bandwidth may be more of a bottleneck..

Removing the group by / order by:

  • Uncompressed: 0.5305
  • Zstd: 0.7015

New Query:

    select
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        test
    where
        l_shipdate <= date '1998-09-02'

I also timed caching the entire parquet file into memory (select * from test), then df.cache(). I only did average over 5 runs this time.

  • Uncompressed: 6.818s
  • Zstd: 10.052

@tustvold
Copy link
Contributor

remote object storage where bandwidth

It is more typically first-byte latency, not bandwidth that hurts with OS, so the size of the pages can end up being less relevant than you might expect...

I dunno, I suspect there is no "correct" answer to this, which leads me to be tempted to just leave it as is, but don't feel strongly, so if other people do...

@devinjdangelo
Copy link
Contributor Author

I agree that there is no universally optimal choice and am also interested in more opinions.

For the bandwidth concern, I'm thinking more about a user who installs the python bindings to their macbook pro and reads from S3. The bottleneck there may be their likely 1gbps network connection. Even in a server environment if you are running on a single node, bandwidth could be limiting.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think defaulting to basic and fast block level compression is much less surprising than NO compression for most users and therefore I think this is a good change.

While there may be corner cases where no compression is desired as @tustvold mentions, I think they are somewhat specialized

Maybe some other contributors have opinions on the matter (@Dandandan perhaps?)

@alamb
Copy link
Contributor

alamb commented Sep 29, 2023

I took the liberty of merging up from main to get #7701 and solve the failing CI check

@Dandandan
Copy link
Contributor

I think defaulting to basic and fast block level compression is much less surprising than NO compression for most users and therefore I think this is a good change.

While there may be corner cases where no compression is desired as @tustvold mentions, I think they are somewhat specialized

Maybe some other contributors have opinions on the matter (@Dandandan perhaps?)

I agree. Parquet is designed for block compression. And people want to optimize for long term storage cost as well. Zstd is probably one of the better defaults for use cases @devinjdangelo describes (data pipelines).

@Dandandan Dandandan merged commit 692ea24 into apache:main Sep 30, 2023
@Dandandan
Copy link
Contributor

Thank you @devinjdangelo

Ted-Jiang pushed a commit to Ted-Jiang/arrow-datafusion that referenced this pull request Oct 7, 2023
* update compression default

* fix tests

---------

Co-authored-by: Andrew Lamb <[email protected]>
@andygrove andygrove added the enhancement New feature or request label Oct 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Update Default Parquet Write Compression to Sensible Default
5 participants