Skip to content

Commit

Permalink
add more integration test for parquet bloom filter round trip tests (#…
Browse files Browse the repository at this point in the history
…3210)

* add more integration test

* fix doc
  • Loading branch information
jimexist authored Dec 29, 2022
1 parent 513d543 commit 99a20dd
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 68 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/parquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ jobs:
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
- name: Install binary for checking
run: cargo install --path parquet --bin parquet-show-bloom-filter --features=arrow,cli
run: |
cargo install --path parquet --bin parquet-show-bloom-filter --features=cli
cargo install --path parquet --bin parquet-fromcsv --features=arrow,cli
- name: Run pytest
run: |
cd parquet/pytest
Expand Down
65 changes: 0 additions & 65 deletions parquet/pytest/pyspark_integration_test.py

This file was deleted.

2 changes: 1 addition & 1 deletion parquet/pytest/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
pytest
pyspark
black

pandas
71 changes: 71 additions & 0 deletions parquet/pytest/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,69 @@ mypy-extensions==0.4.3 \
--hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
--hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8
# via black
numpy==1.23.5 \
--hash=sha256:01dd17cbb340bf0fc23981e52e1d18a9d4050792e8fb8363cecbf066a84b827d \
--hash=sha256:06005a2ef6014e9956c09ba07654f9837d9e26696a0470e42beedadb78c11b07 \
--hash=sha256:09b7847f7e83ca37c6e627682f145856de331049013853f344f37b0c9690e3df \
--hash=sha256:0aaee12d8883552fadfc41e96b4c82ee7d794949e2a7c3b3a7201e968c7ecab9 \
--hash=sha256:0cbe9848fad08baf71de1a39e12d1b6310f1d5b2d0ea4de051058e6e1076852d \
--hash=sha256:1b1766d6f397c18153d40015ddfc79ddb715cabadc04d2d228d4e5a8bc4ded1a \
--hash=sha256:33161613d2269025873025b33e879825ec7b1d831317e68f4f2f0f84ed14c719 \
--hash=sha256:5039f55555e1eab31124a5768898c9e22c25a65c1e0037f4d7c495a45778c9f2 \
--hash=sha256:522e26bbf6377e4d76403826ed689c295b0b238f46c28a7251ab94716da0b280 \
--hash=sha256:56e454c7833e94ec9769fa0f86e6ff8e42ee38ce0ce1fa4cbb747ea7e06d56aa \
--hash=sha256:58f545efd1108e647604a1b5aa809591ccd2540f468a880bedb97247e72db387 \
--hash=sha256:5e05b1c973a9f858c74367553e236f287e749465f773328c8ef31abe18f691e1 \
--hash=sha256:7903ba8ab592b82014713c491f6c5d3a1cde5b4a3bf116404e08f5b52f6daf43 \
--hash=sha256:8969bfd28e85c81f3f94eb4a66bc2cf1dbdc5c18efc320af34bffc54d6b1e38f \
--hash=sha256:92c8c1e89a1f5028a4c6d9e3ccbe311b6ba53694811269b992c0b224269e2398 \
--hash=sha256:9c88793f78fca17da0145455f0d7826bcb9f37da4764af27ac945488116efe63 \
--hash=sha256:a7ac231a08bb37f852849bbb387a20a57574a97cfc7b6cabb488a4fc8be176de \
--hash=sha256:abdde9f795cf292fb9651ed48185503a2ff29be87770c3b8e2a14b0cd7aa16f8 \
--hash=sha256:af1da88f6bc3d2338ebbf0e22fe487821ea4d8e89053e25fa59d1d79786e7481 \
--hash=sha256:b2a9ab7c279c91974f756c84c365a669a887efa287365a8e2c418f8b3ba73fb0 \
--hash=sha256:bf837dc63ba5c06dc8797c398db1e223a466c7ece27a1f7b5232ba3466aafe3d \
--hash=sha256:ca51fcfcc5f9354c45f400059e88bc09215fb71a48d3768fb80e357f3b457e1e \
--hash=sha256:ce571367b6dfe60af04e04a1834ca2dc5f46004ac1cc756fb95319f64c095a96 \
--hash=sha256:d208a0f8729f3fb790ed18a003f3a57895b989b40ea4dce4717e9cf4af62c6bb \
--hash=sha256:dbee87b469018961d1ad79b1a5d50c0ae850000b639bcb1b694e9981083243b6 \
--hash=sha256:e9f4c4e51567b616be64e05d517c79a8a22f3606499941d97bb76f2ca59f982d \
--hash=sha256:f063b69b090c9d918f9df0a12116029e274daf0181df392839661c4c7ec9018a \
--hash=sha256:f9a909a8bae284d46bbfdefbdd4a262ba19d3bc9921b1e76126b1d21c3c34135
# via pandas
packaging==21.3 \
--hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \
--hash=sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522
# via pytest
pandas==1.5.2 \
--hash=sha256:0183cb04a057cc38fde5244909fca9826d5d57c4a5b7390c0cc3fa7acd9fa883 \
--hash=sha256:1fc87eac0541a7d24648a001d553406f4256e744d92df1df8ebe41829a915028 \
--hash=sha256:220b98d15cee0b2cd839a6358bd1f273d0356bf964c1a1aeb32d47db0215488b \
--hash=sha256:2552bffc808641c6eb471e55aa6899fa002ac94e4eebfa9ec058649122db5824 \
--hash=sha256:315e19a3e5c2ab47a67467fc0362cb36c7c60a93b6457f675d7d9615edad2ebe \
--hash=sha256:344021ed3e639e017b452aa8f5f6bf38a8806f5852e217a7594417fb9bbfa00e \
--hash=sha256:375262829c8c700c3e7cbb336810b94367b9c4889818bbd910d0ecb4e45dc261 \
--hash=sha256:457d8c3d42314ff47cc2d6c54f8fc0d23954b47977b2caed09cd9635cb75388b \
--hash=sha256:4aed257c7484d01c9a194d9a94758b37d3d751849c05a0050c087a358c41ad1f \
--hash=sha256:530948945e7b6c95e6fa7aa4be2be25764af53fba93fe76d912e35d1c9ee46f5 \
--hash=sha256:5ae7e989f12628f41e804847a8cc2943d362440132919a69429d4dea1f164da0 \
--hash=sha256:71f510b0efe1629bf2f7c0eadb1ff0b9cf611e87b73cd017e6b7d6adb40e2b3a \
--hash=sha256:73f219fdc1777cf3c45fde7f0708732ec6950dfc598afc50588d0d285fddaefc \
--hash=sha256:8092a368d3eb7116e270525329a3e5c15ae796ccdf7ccb17839a73b4f5084a39 \
--hash=sha256:82ae615826da838a8e5d4d630eb70c993ab8636f0eff13cb28aafc4291b632b5 \
--hash=sha256:9608000a5a45f663be6af5c70c3cbe634fa19243e720eb380c0d378666bc7702 \
--hash=sha256:a40dd1e9f22e01e66ed534d6a965eb99546b41d4d52dbdb66565608fde48203f \
--hash=sha256:b4f5a82afa4f1ff482ab8ded2ae8a453a2cdfde2001567b3ca24a4c5c5ca0db3 \
--hash=sha256:c009a92e81ce836212ce7aa98b219db7961a8b95999b97af566b8dc8c33e9519 \
--hash=sha256:c218796d59d5abd8780170c937b812c9637e84c32f8271bbf9845970f8c1351f \
--hash=sha256:cc3cd122bea268998b79adebbb8343b735a5511ec14efb70a39e7acbc11ccbdc \
--hash=sha256:d0d8fd58df5d17ddb8c72a5075d87cd80d71b542571b5f78178fb067fa4e9c72 \
--hash=sha256:e18bc3764cbb5e118be139b3b611bc3fbc5d3be42a7e827d1096f46087b395eb \
--hash=sha256:e2b83abd292194f350bb04e188f9379d36b8dfac24dd445d5c87575f3beaf789 \
--hash=sha256:e7469271497960b6a781eaa930cba8af400dd59b62ec9ca2f4d31a19f2f91090 \
--hash=sha256:e9dbacd22555c2d47f262ef96bb4e30880e5956169741400af8b306bbb24a273 \
--hash=sha256:f6257b314fc14958f8122779e5a1557517b0f8e500cfb2bd53fa1f75a8ad0af2
# via -r requirements.in
pathspec==0.10.2 \
--hash=sha256:88c2606f2c1e818b978540f73ecc908e13999c6c3a383daf3705652ae79807a5 \
--hash=sha256:8f6bf73e5758fd365ef5d58ce09ac7c27d2833a8d7da51712eac6e27e35141b0
Expand Down Expand Up @@ -94,6 +153,18 @@ pytest==7.2.0 \
--hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \
--hash=sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59
# via -r requirements.in
python-dateutil==2.8.2 \
--hash=sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86 \
--hash=sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9
# via pandas
pytz==2022.6 \
--hash=sha256:222439474e9c98fced559f1709d89e6c9cbf8d79c794ff3eb9f8800064291427 \
--hash=sha256:e89512406b793ca39f5971bc999cc538ce125c0e51c27941bef4568b460095e2
# via pandas
six==1.16.0 \
--hash=sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926 \
--hash=sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254
# via python-dateutil
tomli==2.0.1 \
--hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \
--hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f
Expand Down
112 changes: 112 additions & 0 deletions parquet/pytest/test_parquet_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyspark.sql
import pandas as pd
from tempfile import NamedTemporaryFile, TemporaryDirectory
import subprocess
import pathlib
import pytest


def create_data_and_spark_df(n):
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.conf.set("parquet.bloom.filter.enabled", True)
spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
spark.conf.set("parquet.bloom.filter.max.bytes", 32)
data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(n)]
df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
return data, df


def create_data_and_pandas_df(n):
data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(n)]
df = pd.DataFrame(data, columns=["id", "name"])
return data, df


def get_expected_output(data):
expected = ["Row group #0", "=" * 80]
for v in data:
expected.append(f"Value {v[0]} is present in bloom filter")
for v in data:
expected.append(f"Value {v[1]} is absent in bloom filter")
expected = "\n".join(expected) + "\n"
return expected.encode("utf-8")


def get_from_csv_cli_output(schema_file, output_file, csv_file):
args = [
"parquet-fromcsv",
"--schema",
schema_file,
"--enable-bloom-filter",
"true",
"--input-file",
csv_file,
"--output-file",
output_file,
]
return subprocess.check_output(args)


def get_show_filter_cli_output(output_dir, data, col_name="id"):
# take the first (and only) parquet file
(parquet_file,) = sorted(pathlib.Path(output_dir).glob("*.parquet"))
args = [
"parquet-show-bloom-filter",
"--file-name",
parquet_file,
"--column",
col_name,
]
for v in data:
args.extend(["--values", v[0]])
for v in data:
args.extend(["--values", v[1]])
return subprocess.check_output(args)


SCHEMA = b"""message schema {
required binary id (UTF8);
required binary name (UTF8);
}"""


@pytest.mark.parametrize("n", [1, 10])
class TestParquetIntegration:
def test_pyspark_bloom_filter(self, n):
data, df = create_data_and_spark_df(n)
with TemporaryDirectory() as output_dir:
df.write.parquet(output_dir, mode="overwrite")
cli_output = get_show_filter_cli_output(output_dir, data)
assert cli_output == get_expected_output(data)

def test_bloom_filter_round_trip(self, n):
data, df = create_data_and_pandas_df(n)
with NamedTemporaryFile(suffix=".csv") as csv_file, NamedTemporaryFile(
suffix=".schema"
) as schema_file, TemporaryDirectory() as output_dir:
schema_file.write(SCHEMA)
schema_file.flush()
df.to_csv(csv_file.name, index=False, header=True)
parquet_file = pathlib.Path(output_dir) / "output.parquet"
cli_output = get_from_csv_cli_output(
schema_file.name, parquet_file, csv_file.name
)
assert cli_output == b""
cli_output = get_show_filter_cli_output(output_dir, data)
assert cli_output == get_expected_output(data)
7 changes: 6 additions & 1 deletion parquet/src/bin/parquet-fromcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! cargo install parquet --features=cli
//! ```
//!
//! After this `parquet-fromcsv` shoud be available:
//! After this `parquet-fromcsv` should be available:
//!
//! ```text
//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet
Expand All @@ -46,22 +46,27 @@
//!
//! ## Parquet file options
//!
//! ```text
//! - `-b`, `--batch-size` : Batch size for Parquet
//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY
//! - `-s`, `--schema` : Path to message schema for generated Parquet file
//! - `-o`, `--output-file` : Path to output Parquet file
//! - `-w`, `--writer-version` : Writer version
//! - `-m`, `--max-row-group-size` : Max row group size
//! - `--enable-bloom-filter` : Enable bloom filter during writing
//! ```
//!
//! ## Input file options
//!
//! ```text
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF
//! - `-q`, `--quote-char` : Input quoting character
//! ```
//!
use std::{
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/bin/parquet-show-bloom-filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ fn main() {
}
)
});
} else {
println!("No bloom filter found for column {}", args.column);
}
} else {
println!(
Expand Down

0 comments on commit 99a20dd

Please sign in to comment.