Skip to content

Commit

Permalink
Add RowGroupInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
fsaintjacques committed May 18, 2020
1 parent fe07d98 commit aea5db3
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 231 deletions.
313 changes: 178 additions & 135 deletions cpp/src/arrow/dataset/file_parquet.cc

Large diffs are not rendered by default.

119 changes: 76 additions & 43 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class ReaderProperties;
class ArrowReaderProperties;
namespace arrow {
class FileReader;
};
}; // namespace arrow
} // namespace parquet

namespace arrow {
namespace dataset {

class RowGroupInfo;

/// \brief A FileFormat implementation that reads from Parquet files
class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
public:
Expand Down Expand Up @@ -98,28 +100,77 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
std::shared_ptr<ScanContext> context) const override;

/// \brief Open a file for scanning, restricted to the specified row groups.
Result<ScanTaskIterator> ScanFile(std::shared_ptr<ScanOptions> options,
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context,
std::unique_ptr<parquet::ParquetFileReader> reader,
std::vector<int> row_groups) const;
std::vector<RowGroupInfo> row_groups) const;

using FileFormat::MakeFragment;

/// \brief Create a Fragment, restricted to the specified row groups.
Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, std::shared_ptr<Expression> partition_expression) override;
FileSource source, std::shared_ptr<Expression> partition_expression,
std::vector<RowGroupInfo> row_groups);

/// \brief Create a Fragment, restricted to the specified row groups.
Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, std::shared_ptr<Expression> partition_expression,
std::vector<int> row_groups);

/// \brief Create a Fragment targeting all RowGroups.
Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, std::shared_ptr<Expression> partition_expression,
std::vector<int> row_groups, ExpressionVector statistics);
FileSource source, std::shared_ptr<Expression> partition_expression) override;

/// \brief Return a FileReader on the given source.
Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, MemoryPool* pool = default_memory_pool()) const;
const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const;
};

/// \brief Represents a parquet's RowGroup with extra information.
class RowGroupInfo : public util::EqualityComparable<RowGroupInfo> {
public:
RowGroupInfo() : RowGroupInfo(-1) {}

/// \brief Construct a RowGroup from an identifier.
explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {}

/// \brief Construct a RowGroup from an identifier with statistics.
RowGroupInfo(int id, int64_t num_rows, std::shared_ptr<Expression> statistics)
: id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {}

/// \brief Transform a vector of identifiers into a vector of RowGroupInfos
static std::vector<RowGroupInfo> FromIdentifiers(const std::vector<int> ids);
static std::vector<RowGroupInfo> FromCount(int count);

/// \brief Return the RowGroup's identifier (index in the file).
int id() const { return id_; }

/// \brief Return the RowGroup's number of rows.
///
/// If statistics are not provided, return 0.
int64_t num_rows() const { return num_rows_; }
void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }

/// \brief Return the RowGroup's statistics
const std::shared_ptr<Expression>& statistics() const { return statistics_; }
void set_statistics(std::shared_ptr<Expression> statistics) {
statistics_ = std::move(statistics);
}

/// \brief Indicate if statistics are set.
bool HasStatistics() const { return statistics_ != NULLPTR; }

/// \brief Indicate if the RowGroup's statistics satisfy the predicate.
///
/// If the RowGroup was not initialized with statistics, it is deemd
bool Satisfy(const Expression& predicate) const;

/// \brief Indicate if the other RowGroup points to the same RowGroup.
bool Equals(const RowGroupInfo& other) const { return id() == other.id(); }

private:
int id_;
int64_t num_rows_;
std::shared_ptr<Expression> statistics_;
};

/// \brief A FileFragment with parquet logic.
Expand All @@ -138,49 +189,31 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) override;

Result<FragmentVector> SplitByRowGroup(const std::shared_ptr<Expression>& predicate);

/// \brief Return the RowGroups selected by this fragment. An empty list
/// represents all RowGroups in the parquet file.
const std::vector<int>& row_groups() const { return row_groups_; }

/// \brief Split into a vector of Fragment for each row group.
///
/// This method incurs IO on backed source.
///
/// \param[in] predicate expression ignoring RowGroups which can't satisfy
/// the predicate.
///
/// \return A vector of fragment.
Result<FragmentVector> SplitByRowGroup(
std::shared_ptr<Expression> predicate = scalar(true));
const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }

private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<Expression> partition_expression,
std::vector<int> row_groups, ExpressionVector row_group_statistics)
: FileFragment(std::move(source), std::move(format),
std::move(partition_expression)),
row_groups_(std::move(row_groups)),
statistics_(std::move(row_group_statistics)),
parquet_format_(internal::checked_cast<ParquetFileFormat&>(*format_)) {}

/// \brief Returns a subset of the selected RowGroups matching a given predicate.
Result<std::vector<int>> RowGroupsWithFilter(
const parquet::FileMetaData& metadata,
const parquet::ArrowReaderProperties& properties,
const Expression& predicate) const;

struct RowGroup {
int id;
std::shared_ptr<Expression> statistics;
};
std::vector<RowGroupInfo> row_groups);

std::vector<int> row_groups_;
ExpressionVector statistics_;
std::vector<RowGroupInfo> row_groups_;
ParquetFileFormat& parquet_format_;

friend class ParquetFileFormat;
};

/// \brief Create FileSystemDataset from custom `_metadata` cache file.
///
/// Dask and other systems will generate a cache metadata file by concatenating
/// the RowGroupMetaData of multiple parquet files in a single parquet file.
///
/// ParquetDatasetFactory creates a FileSystemDataset composed of
/// ParquetFileFragment where each fragment is pre-populated with the exact
/// number of row groups and statistics for each columns.
class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
public:
/// \brief Create a ParquetDatasetFactory from a metadata path.
Expand All @@ -190,7 +223,7 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
///
/// \param[in] metadata_path path of the metadata parquet file
/// \param[in] filesystem from which to open/read the path
/// \param[in] format
/// \param[in] format to read the file with.
static Result<std::shared_ptr<DatasetFactory>> Make(
const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<ParquetFileFormat> format);
Expand All @@ -201,10 +234,10 @@ class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
/// and the base_path is explicited instead of inferred from the metadata
/// path.
///
/// \param[in] metadata_source source to open the metadata parquet file from
/// \param[in] metadata source to open the metadata parquet file from
/// \param[in] base_path used as the prefix of every parquet files referenced
/// \param[in] filesystem from which to read the files referenced.
/// \param[in] format
/// \param[in] format to read the file with.
static Result<std::shared_ptr<DatasetFactory>> Make(
const FileSource& metadata, const std::string& base_path,
std::shared_ptr<fs::FileSystem> filesystem,
Expand Down
24 changes: 11 additions & 13 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,16 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin {
std::vector<int> expected_row_groups,
const Expression& filter) {
auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
ASSERT_OK_AND_ASSIGN(auto row_group_fragments,
parquet_fragment->SplitByRowGroup(filter.Copy()))
ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(filter.Copy()))

auto expected_row_group = expected_row_groups.begin();
for (const auto& fragment : row_group_fragments) {
const auto& parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragment);
EXPECT_EQ(fragments.size(), expected_row_groups.size());
for (size_t i = 0; i < fragments.size(); i++) {
auto expected = expected_row_groups[i];
auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(fragments[i]);

auto i = *expected_row_group++;
EXPECT_EQ(parquet_fragment->row_groups(), std::vector<int>{i});

EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), i + 1);
EXPECT_EQ(parquet_fragment->row_groups(),
RowGroupInfo::FromIdentifiers({expected}));
EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
}
}

Expand Down Expand Up @@ -433,15 +432,14 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) {

TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
constexpr int64_t kNumRowGroups = 16;
constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;

auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
auto source = GetFileSource(reader.get());

opts_ = ScanOptions::Make(reader->schema());
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

CountRowGroupsInFragment(fragment, internal::Iota(static_cast<int>(kTotalNumRows)),
CountRowGroupsInFragment(fragment, internal::Iota(static_cast<int>(kNumRowGroups)),
*scalar(true));

for (int i = 0; i < kNumRowGroups; ++i) {
Expand All @@ -465,7 +463,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) {
CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast<int>(kNumRowGroups)),
"i64"_ >= int64_t(6));

CountRowGroupsInFragment(fragment, {5, 6, 7},
CountRowGroupsInFragment(fragment, {5, 6},
"i64"_ >= int64_t(6) and "i64"_ < int64_t(8));
}

Expand All @@ -491,7 +489,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) {
// individual selection selects a single row group
for (int i = 0; i < kNumRowGroups; ++i) {
CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1);
EXPECT_EQ(row_groups_fragment({i})->row_groups(), std::vector<int>{i});
EXPECT_EQ(row_groups_fragment({i})->row_groups(), RowGroupInfo::FromIdentifiers({i}));
}

for (int i = 0; i < kNumRowGroups; ++i) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ struct ArithmeticDatasetFixture {

std::stringstream ss;
ss << "[\n";
for (int64_t i = 0; i < n; i++) {
if (i != 0) {
for (int64_t i = 1; i <= n; i++) {
if (i != 1) {
ss << "\n,";
}
ss << record;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ class FileReaderImpl : public FileReader {

const ArrowReaderProperties& properties() const override { return reader_properties_; }

const SchemaManifest& manifest() const override { return manifest_; }

Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows) override {
BEGIN_PARQUET_CATCH_EXCEPTIONS
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace arrow {

class ColumnChunkReader;
class ColumnReader;
struct SchemaManifest;
class RowGroupReader;

/// \brief Arrow read adapter class for deserializing Parquet files as Arrow row batches.
Expand Down Expand Up @@ -213,6 +214,8 @@ class PARQUET_EXPORT FileReader {

virtual const ArrowReaderProperties& properties() const = 0;

virtual const SchemaManifest& manifest() const = 0;

virtual ~FileReader() = default;
};

Expand Down
46 changes: 42 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,42 @@ cdef class FileFragment(Fragment):
return FileFormat.wrap(self.file_fragment.format())


cdef class RowGroupInfo:
"""A wrapper class for RowGroup information"""

cdef:
CRowGroupInfo info

def __init__(self, int id):
cdef CRowGroupInfo info = CRowGroupInfo(id)
self.init(info)

cdef void init(self, CRowGroupInfo info):
self.info = info

@staticmethod
cdef wrap(CRowGroupInfo info):
cdef RowGroupInfo self = RowGroupInfo.__new__(RowGroupInfo)
self.init(info)
return self

@property
def id(self):
return self.info.id()

@property
def num_rows(self):
return self.info.num_rows()

def __eq__(self, other):
if not isinstance(other, RowGroupInfo):
return False
cdef:
RowGroupInfo row_group = other
CRowGroupInfo c_info = row_group.info
return self.info.Equals(c_info)


cdef class ParquetFileFragment(FileFragment):
"""A Fragment representing a parquet file."""

Expand All @@ -770,10 +806,12 @@ cdef class ParquetFileFragment(FileFragment):

@property
def row_groups(self):
row_groups = set(self.parquet_file_fragment.row_groups())
if len(row_groups) != 0:
return row_groups
return None
cdef:
vector[CRowGroupInfo] c_row_groups
c_row_groups = self.parquet_file_fragment.row_groups()
if c_row_groups.empty():
return None
return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]

def split_by_row_group(self, Expression predicate=None):
"""
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
ParquetReadOptions,
Partitioning,
PartitioningFactory,
RowGroupInfo,
Scanner,
ScanTask,
UnionDataset,
Expand Down
11 changes: 10 additions & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,18 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
const CFileSource& source() const
const shared_ptr[CFileFormat]& format() const

cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo":
CRowGroupInfo()
CRowGroupInfo(int id)
CRowGroupInfo(
int id, int64_t n_rows, shared_ptr[CExpression] statistics)
int id() const
int64_t num_rows() const
bint Equals(const CRowGroupInfo& other)

cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
CFileFragment):
const vector[int]& row_groups() const
const vector[CRowGroupInfo]& row_groups() const
CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup(
shared_ptr[CExpression] predicate)

Expand Down
Loading

0 comments on commit aea5db3

Please sign in to comment.