Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example for building an external index for parquet files #10546

Closed
alamb opened this issue May 16, 2024 · 6 comments · Fixed by #10549
Closed

Example for building an external index for parquet files #10546

alamb opened this issue May 16, 2024 · 6 comments · Fixed by #10549
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented May 16, 2024

Is your feature request related to a problem or challenge?

It is common in databases and other analytic system to have additional external "indexes" (perhaps stored in the "metadata catalog", perhaps stored alongside the data files, perhaps embedded in the files, perhaps elsewhere)

These indexes are used to speed up queries by "pruning" the files -- specifically evaluating a predicate on the index and then only reading files / portions of files that would pass the filters in the query

Implementing such a index requires several three steps:

  1. Creating / maintaining the actual index
  2. A way to evaluate the index against the query predicate
  3. A way to use the index to filter the files that need to be read

DataFusion has code already to do 2 (PruningPredicate) and 3 (ParquetExec) but I am not sure how obvious it is to put togeher

DataFusion actually also has basic support for 1 (e.g. the ListingTableProvider reads statistics and prunes based on their statistics)

We have some version of this in InfluxDB today, and we are in the process of extending it. However, I think the usecase is much more general and could be useful for other systems as well (e.g. full text indexes across parquet, for example).

I also think creating an example will help motivate the work from @NGA-TRAN in #10453 and myself in #9929

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented May 28, 2024

Update here is I have a basic example #10549 ready for review / merge

@adriangb
Copy link
Contributor

adriangb commented Jun 5, 2024

Sorry for jumping in here, maybe this isn't the best issue but it's hard to keep up with all of the amazing work you're doing @alamb!

I wanted to pitch a use case I've been thinking about of storing a secondary index on a searchable async location. Think a relational database with ACID guarantees. In particular the key would be that hooks to do selections / pruning be async and that they pass in filters: I'd push down the filters into filters in the metadata store and run an actual query there that returns the files / row groups to scan. This is in contrast to #10549 for example where the index is in memory and fully materialized. I realize that TableProvider.scan already serves this purpose, but it'd be nice to integrate into these new APIs instead of having to implement more things oneself because you're hooking in at a higher (lower?) level.

@alamb
Copy link
Contributor Author

alamb commented Jun 6, 2024

Sorry for jumping in here, maybe this isn't the best issue but it's hard to keep up with all of the amazing work you're doing @alamb!

Thanks @adriangb ❤️

I wanted to pitch a use case I've been thinking about of storing a secondary index on a searchable async location. Think a relational database with ACID guarantees. In particular the key would be that hooks to do selections / pruning be async and that they pass in filters: I'd push down the filters into filters in the metadata store and run an actual query there that returns the files / row groups to scan. This is in contrast to #10549 for example where the index is in memory and fully materialized.

Yes, I agree this is a very common usecase in modern database / data systems and one I hope will be easier to implement with some of these APIs (btw see #10813 for an even lower level API which I think brings this idea to its lowest leve.)

I realize that TableProvider.scan already serves this purpose, but it'd be nice to integrate into these new APIs instead of having to implement more things oneself because you're hooking in at a higher (lower?) level.

I agree that you could do an async call as part of TableProvider::scan to fetch the relevant information from the remote store. Specifically, here

async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let df_schema = DFSchema::try_from(self.schema())?;
// convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
// Use the index to find the files that might have data that matches the
// predicate. Any file that can not have data that matches the predicate
// will not be returned.
let files = self.index.get_files(predicate.clone())?;
let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);
// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
for (file_name, file_size) in files {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
}
let exec = ParquetExec::builder(file_scan_config)
.with_predicate(predicate)
.build_arc();

One thing that is still unclear in my mind is what other APIs we could offer to make it easier to implement an external index. Most of the the code in parquet_index.rs is to create the in memory index. Maybe we could create an example that shows how to implement a remote index 🤔

@adriangb
Copy link
Contributor

adriangb commented Jun 6, 2024

I do think that example would be nice, it's basically what I was trying to build 😄

My approach was going to be something like:

async fn scan(
    &self,
    state: &SessionState,
    projection: Option<&Vec<usize>>,
    filters: &[Expr],
    limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
    let object_store_url = ObjectStoreUrl::parse("file://")?;
    let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
        .with_projection(projection.cloned())
        .with_limit(limit);

    // Use the index to get row groups to be scanned
    // Index does best effort to parse filters and push them down into the metadata store
    let partitioned_files_with_row_group_selection = self.index.get_files(filters).await?;

    for file in partitioned_files_with_row_group_selection {
         file_scan_config = file_scan_config.with_file(PartitionedFile::new(
            file.canonical_path.display().to_string(),
            file.file_size,
        ).with_extensions(Arc::new(file.access_plan())));
    }

    let df_schema = DFSchema::try_from(self.schema())?;
    // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
    let predicate = conjunction(filters.to_vec());
    let predicate = predicate
        .map(|predicate| state.create_physical_expr(predicate, &df_schema))
        .transpose()?
        .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));

    let exec = ParquetExec::builder(file_scan_config)
        .with_predicate(predicate)
        .build_arc();

    Ok(exec)
}

(several functions and types made up)

Does this sound about in line with what you would think of as an example? I think implementing the async store as a familiar RDMS (SQLite via SQLx?) would be a good example.

@alamb
Copy link
Contributor Author

alamb commented Jun 7, 2024

Does this sound about in line with what you would think of as an example? I think implementing the async store as a familiar RDMS (SQLite via SQLx?) would be a good example.

Yes that is very much in line.

Using SQLite via sql-x would be cool, though I don't think we would want to add new dependencies into the core datafusion crates themselves.

I made a new repo in datafusion-contrib here https://github.com/datafusion-contrib/datafusion-async-parquet-index and invited you to be an admin, in case you want to do things there

@adriangb
Copy link
Contributor

adriangb commented Jun 7, 2024

datafusion-contrib/datafusion-async-parquet-index#1 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants