Skip to content

Commit

Permalink
add parquet_dataset tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche committed May 19, 2020
1 parent 2bb5686 commit b8a6a16
Showing 1 changed file with 102 additions and 0 deletions.
102 changes: 102 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,3 +1440,105 @@ def test_feather_format(tempdir):
write_feather(table, str(basedir / "data1.feather"), version=1)
with pytest.raises(ValueError):
ds.dataset(basedir, format="feather").to_table()


def _create_parquet_dataset_simple(root_path):
import pyarrow.parquet as pq

metadata_collector = []

for i in range(4):
table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
pq.write_to_dataset(
table, str(root_path), metadata_collector=metadata_collector
)

# write _metadata file
pq.write_metadata(
table.schema, str(root_path / '_metadata'),
metadata_collector=metadata_collector
)
return table


@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
def test_parquet_dataset_factory(tempdir):
root_path = tempdir / "test_parquet_dataset"
table = _create_parquet_dataset_simple(root_path)

dataset = ds.parquet_dataset(str(root_path / '_metadata'))
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 4
result = dataset.to_table()
assert result.num_rows == 40


@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_invalid(tempdir):
root_path = tempdir / "test_parquet_dataset"
table = _create_parquet_dataset_simple(root_path)
# remove one of the files
list(root_path.glob("*.parquet"))[0].unlink()

dataset = ds.parquet_dataset(str(root_path / '_metadata'))
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 4
# TODO this segfaults with
# terminate called after throwing an instance of 'std::system_error'
# what(): Invalid argument
# with pytest.raises(ValueError):
# dataset.to_table()


def _create_metadata_file(root_path):
# create _metadata file from existing parquet dataset
import pyarrow.parquet as pq

parquet_paths = list(sorted(root_path.rglob("*.parquet")))
schema = pq.ParquetFile(parquet_paths[0]).schema.to_arrow_schema()

metadata_collector = []
for path in parquet_paths:
metadata = pq.ParquetFile(path).metadata
metadata.set_file_path(str(path.relative_to(root_path)))
metadata_collector.append(metadata)

metadata_path = root_path / "_metadata"
pq.write_metadata(
schema, metadata_path, metadata_collector=metadata_collector
)


def _create_parquet_dataset_partitioned(root_path):
import pyarrow.parquet as pq

table = pa.table({
'f1': range(20), 'f2': np.random.randn(20),
'part': np.repeat(['a', 'b'], 10)}
)
pq.write_to_dataset(table, str(root_path), partition_cols=['part'])
_create_metadata_file(root_path)
return table


@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_partitioned(tempdir):
# TODO support for specifying partitioning scheme

root_path = tempdir / "test_parquet_dataset"
table = _create_parquet_dataset_partitioned(root_path)

dataset = ds.parquet_dataset(str(root_path / '_metadata'))
# TODO partition column not yet included
# assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 2
result = dataset.to_table()
assert result.num_rows == 20

# the partitioned dataset does not preserve order
result = result.to_pandas().sort_values("f1").reset_index(drop=True)
expected = table.to_pandas().drop(columns=["part"])
pd.testing.assert_frame_equal(result, expected)

0 comments on commit b8a6a16

Please sign in to comment.