Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve regexp_match performance by avoiding cloning Regex #8631

Merged
merged 4 commits into from
Dec 24, 2023

Conversation

viirya
Copy link
Member

@viirya viirya commented Dec 22, 2023

Which issue does this PR close?

Closes #8492.

Rationale for this change

regexp_match scalar expression is quite slow in DataFusion now. Using https://github.com/pypi-data/data/releases data to do test on M1 Mac laptop:

DataFusion CLI v34.0.0
❯ SELECT COUNT(*) FROM '../index-0.parquet' WHERE
    ARRAY_LENGTH(
      REGEXP_MATCH(path, '\\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$')
    ) > 0;
+----------+
| COUNT(*) |
+----------+
| 5834398  |
+----------+
1 row in set. Query took 829.932 seconds.

After this change:

DataFusion CLI v34.0.0
❯ SELECT COUNT(*) FROM '../index-0.parquet' WHERE
    ARRAY_LENGTH(
      REGEXP_MATCH(path, '\\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$')
    ) > 0;
+----------+
| COUNT(*) |
+----------+
| 5834398  |
+----------+
1 row in set. Query took 33.935 seconds.

It is not too much related to padding constant regex to an array and recompiling regex per row as discussed in (#8492). Because arrow-rs regexp_match kernel internally maintains a hash map on duplicate regex patterns, so same regex pattern is only compiled once per batch. To verify it, I also did a factoring to use single regex if the regex is a constant, and the result confirms that because the performance didn't get better.

After playing it around, the bad performance is due to cloning Regex per row. Regex has a CachePool. Cloning Regex will create a fresh CachePool (it is designed to avoid sharing the cache among multiple threads) and it is somehow expensive to re-creating cache space per row instead of reusing the cache.

What changes are included in this PR?

Removing unnecessary and expensive cloning on Regex.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Dec 22, 2023
Comment on lines 82 to 83
/// TODO: Remove this once it is included in arrow-rs new release.
fn _regexp_match<OffsetSize: OffsetSizeTrait>(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will go to submit this fix and benchmark to arrow-rs later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comphead
Copy link
Contributor

Thanks @viirya that awesome. Are you planning to move this change to arrow-rs kernel?

@viirya
Copy link
Member Author

viirya commented Dec 22, 2023

Yea, as mentioned #8631 (comment), I will submit this fix with a benchmark on the kernel to arrow-rs later.

regex_array: &GenericStringArray<OffsetSize>,
flags_array: Option<&GenericStringArray<OffsetSize>>,
) -> std::result::Result<ArrayRef, ArrowError> {
let mut patterns: std::collections::HashMap<String, Regex> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome -- I think we can do better still (this will still recompile each regex per batch I think -- we can do it once per plan)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it is per batch (not per row). Definitely if we do the regex compilation per query plan, it will be better. 🚀

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, to clarify it if the long PR description is overlooked by reviewers. This hash map is already in the arrow-rs kernel. The actual fix here is to avoid expensive cloning of Regex per row. I described the reason in the description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb are you thinking to have some static shared memory pool per query to make static values to be calculated only once per query? we can prob do that with lazy_static and std::sync::Once

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @alamb means something like Pre-compiled / Prepared ScalarUDFs (#8051).

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @viirya -- I will review this PR later today

(Some(value), Some(pattern)) => {
let existing_pattern = patterns.get(&pattern);
let re = match existing_pattern {
Some(re) => re,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actual fix. Getting rid of cloning Regex per row.

))
})?;
patterns.insert(pattern.clone(), re);
patterns.get(&pattern).unwrap()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here too.

@@ -58,7 +59,7 @@ pub fn regexp_match<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
2 => {
let values = as_generic_string_array::<T>(&args[0])?;
let regex = as_generic_string_array::<T>(&args[1])?;
compute::regexp_match(values, regex, None).map_err(|e| arrow_datafusion_err!(e))
_regexp_match(values, regex, None).map_err(|e| arrow_datafusion_err!(e))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we're also are not supporting the scalar version (I.e. regex being a literatal value)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you did some testing already apache/arrow-rs#5235 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have bigger "plans" -- #8051 to specialize the constant arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, for the ScalarUDF in DataFusion, we probably can rely on further optimization like #8051. Per kernel perspective, I will probably submit a Datum version later as mentioned apache/arrow-rs#5235 (comment).

@alamb
Copy link
Contributor

alamb commented Dec 22, 2023

I also verified that this PR improves the entire query from #8492, which also uses regexp_replace.

  • main: 41754 rows in set (40 shown). Query took 146.106 seconds.
  • this PR: 41754 rows in set (40 shown). Query took 15.432 seconds.

I double checked at it seems like regexp_replace already has a similar optimization to avoid recompiling the regexp (though it could be even more optimized for constants): https://github.com/apache/arrow-datafusion/blob/2ffda2a9a893455e55cd773d9dd4f426a61d8cd3/datafusion/physical-expr/src/regex_expressions.rs#L101

File Edit Options Buffers Tools SQL Help
SELECT
  month,
  ext,
  COUNT(DISTINCT project_name) AS project_count
FROM (
  SELECT
    project_name,
    DATE_TRUNC('month', uploaded_on) AS month,
    NULLIF(
      REPLACE(
        REPLACE(
          REPLACE(
            REGEXP_REPLACE(
              REGEXP_REPLACE(
                REGEXP_MATCH(path, CONCAT('(', '.([a-z0-9]+)$', ')'))[2],
                'cxx|cpp|cc|c|hpp|h',
                'C/C++',
                'g'
              ),
              '^f.*$',
              'Fortran',
              'g'
            ),
            'rs',
            'Rust'
          ),
          'go',
          'Go'
        ),
        'asm',
        'Assembly'
      ),
      ''
    ) AS ext
  FROM pypi
  WHERE COALESCE(
      ARRAY_LENGTH(
        REGEXP_MATCH(path, '.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$')
      ) > 0,
      FALSE
    )
    AND NOT COALESCE(ARRAY_LENGTH(REGEXP_MATCH(path, '(^|/)test(|s|ing)')) > 0, FALSE)
    AND NOT STRPOS(path, '/site-packages/') > 0
)
WHERE ext IS NOT NULL
GROUP BY month, ext
ORDER BY month DESC, project_count DESC

Here is the explain

DataFusion CLI v34.0.0
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: month DESC NULLS FIRST, project_count DESC NULLS FIRST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |   Projection: month, ext, COUNT(alias1) AS project_count                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |     Aggregate: groupBy=[[month, ext]], aggr=[[COUNT(alias1)]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |       Aggregate: groupBy=[[month, ext, pypi.project_name AS alias1]], aggr=[[]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |         Projection: pypi.project_name, date_trunc(Utf8("month"), pypi.uploaded_on) AS month, nullif(replace(replace(replace(regexp_replace(regexp_replace((regexp_match(pypi.path, Utf8("(.([a-z0-9]+)$)")))[Int64(2)], Utf8("cxx|cpp|cc|c|hpp|h"), Utf8("C/C++"), Utf8("g")), Utf8("^f.*$"), Utf8("Fortran"), Utf8("g")), Utf8("rs"), Utf8("Rust")), Utf8("go"), Utf8("Go")), Utf8("asm"), Utf8("Assembly")), Utf8("")) AS ext                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |           Filter: nullif(replace(replace(replace(regexp_replace(regexp_replace((regexp_match(pypi.path, Utf8("(.([a-z0-9]+)$)")))[Int64(2)], Utf8("cxx|cpp|cc|c|hpp|h"), Utf8("C/C++"), Utf8("g")), Utf8("^f.*$"), Utf8("Fortran"), Utf8("g")), Utf8("rs"), Utf8("Rust")), Utf8("go"), Utf8("Go")), Utf8("asm"), Utf8("Assembly")), Utf8("")) IS NOT NULL AND coalesce(array_length(regexp_match(pypi.path, Utf8(".(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"))) > UInt64(0), Boolean(false)) AND NOT coalesce(array_length(regexp_match(pypi.path, Utf8("(^|/)test(|s|ing)"))) > UInt64(0), Boolean(false)) AND strpos(pypi.path, Utf8("/site-packages/")) <= Int32(0)                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |             TableScan: pypi projection=[project_name, project_version, project_release, uploaded_on, path, archive_path, size, hash, skip_reason, lines, repository], partial_filters=[nullif(replace(replace(replace(regexp_replace(regexp_replace((regexp_match(pypi.path, Utf8("(.([a-z0-9]+)$)")))[Int64(2)], Utf8("cxx|cpp|cc|c|hpp|h"), Utf8("C/C++"), Utf8("g")), Utf8("^f.*$"), Utf8("Fortran"), Utf8("g")), Utf8("rs"), Utf8("Rust")), Utf8("go"), Utf8("Go")), Utf8("asm"), Utf8("Assembly")), Utf8("")) IS NOT NULL, coalesce(array_length(regexp_match(pypi.path, Utf8(".(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"))) > UInt64(0), Boolean(false)), NOT coalesce(array_length(regexp_match(pypi.path, Utf8("(^|/)test(|s|ing)"))) > UInt64(0), Boolean(false)), strpos(pypi.path, Utf8("/site-packages/")) <= Int32(0)]                                                                                                                                                                                                                                                     |
| physical_plan | SortPreservingMergeExec: [month@0 DESC,project_count@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |   SortExec: expr=[month@0 DESC,project_count@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |     ProjectionExec: expr=[month@0 as month, ext@1 as ext, COUNT(alias1)@2 as project_count]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |       AggregateExec: mode=FinalPartitioned, gby=[month@0 as month, ext@1 as ext], aggr=[COUNT(alias1)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |           RepartitionExec: partitioning=Hash([month@0, ext@1], 16), input_partitions=16                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |             AggregateExec: mode=Partial, gby=[month@0 as month, ext@1 as ext], aggr=[COUNT(alias1)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |               AggregateExec: mode=FinalPartitioned, gby=[month@0 as month, ext@1 as ext, alias1@2 as alias1], aggr=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |                   RepartitionExec: partitioning=Hash([month@0, ext@1, alias1@2], 16), input_partitions=16                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |                     AggregateExec: mode=Partial, gby=[month@1 as month, ext@2 as ext, project_name@0 as alias1], aggr=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |                       ProjectionExec: expr=[project_name@0 as project_name, date_trunc(month, uploaded_on@3) as month, nullif(replace(replace(replace(regexp_replace(regexp_replace((regexp_match(path@4, (.([a-z0-9]+)$))).[2], cxx|cpp|cc|c|hpp|h, C/C++, g), ^f.*$, Fortran, g), rs, Rust), go, Go), asm, Assembly), ) as ext]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |                           FilterExec: nullif(replace(replace(replace(regexp_replace(regexp_replace((regexp_match(path@4, (.([a-z0-9]+)$))).[2], cxx|cpp|cc|c|hpp|h, C/C++, g), ^f.*$, Fortran, g), rs, Rust), go, Go), asm, Assembly), ) IS NOT NULL AND coalesce(array_length(regexp_match(path@4, .(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$)) > 0, false) AND NOT coalesce(array_length(regexp_match(path@4, (^|/)test(|s|ing))) > 0, false) AND strpos(path@4, /site-packages/) <= 0                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |                             ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/df_8492/pypi/index-0.parquet:0..245559486], [Users/andrewlamb/Downloads/df_8492/pypi/index-0.parquet:245559486..491118972], [Users/andrewlamb/Downloads/df_8492/pypi/index-0.parquet:491118972..736678458], [Users/andrewlamb/Downloads/df_8492/pypi/index-0.parquet:736678458..982237944], [Users/andrewlamb/Downloads/df_8492/pypi/index-0.parquet:982237944..1227797430], ...]}, projection=[project_name, project_version, project_release, uploaded_on, path, archive_path, size, hash, skip_reason, lines, repository], predicate=nullif(replace(replace(replace(regexp_replace(regexp_replace((regexp_match(path@4, (.([a-z0-9]+)$))).[2], cxx|cpp|cc|c|hpp|h, C/C++, g), ^f.*$, Fortran, g), rs, Rust), go, Go), asm, Assembly), ) IS NOT NULL AND coalesce(array_length(regexp_match(path@4, .(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$)) > 0, false) AND NOT coalesce(array_length(regexp_match(path@4, (^|/)test(|s|ing))) > 0, false) AND strpos(path@4, /site-packages/) <= 0 |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.014 seconds.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that apache/arrow-rs#5235 also changes regexp_is_match_utf8 which is called here:
https://github.com/apache/arrow-datafusion/blob/03c2ef46f2d88fb015ee305ab67df6d930b780e2/datafusion/physical-expr/src/expressions/binary.rs#L36-L37

(maybe via LIKE / ILIKE?)

We can probably

Headline:

1 row in set. Query took 829.932 seconds.

vs

1 row in set. Query took 33.935 seconds.

(aka 37x faster 🚀 )

Thanks @viirya

@alamb
Copy link
Contributor

alamb commented Dec 22, 2023

I also ran the query under the profiler with this branch, and it still spends a significant amount of time compiling the regex, so I think #8051 can still get us even more

Screenshot 2023-12-22 at 5 06 39 PM

@viirya
Copy link
Member Author

viirya commented Dec 22, 2023

Hmm, captures_at is actually to extract all groups matched (i.e., searching matched strings). For compiling the regex, it should happen in Regex::new (this is where we catch compilation error) which is relatively faster (0.3%). I guess that is why I didn't see too much improvement when I tested the Datum version (supporting constant regex) of regexp_match kernel locally with DataFusion.

impl Regex {
  pub fn new(re: &str) -> Result<Regex, Error> {
    RegexBuilder::new(re).build()
  }
}

impl RegexBuilder {
  /// Compiles the pattern given to `RegexBuilder::new` with the
  /// ...
  pub fn build(&self) -> Result<Regex, Error> {
    ...
  }
}

Note that this is where we catch compilation error:

let re = Regex::new(pattern.as_str()).map_err(|e| {
   ArrowError::ComputeError(format!(
     "Regular expression did not compile: {e:?}"
  ))
})?;

@Dandandan
Copy link
Contributor

I double checked at it seems like regexp_replace already has a similar optimization to avoid recompiling the regexp (though it could be even more optimized for constants):

It looks like regexp_replace has the same issue with cloning though.

@alamb
Copy link
Contributor

alamb commented Dec 23, 2023

Hmm, captures_at is actually to extract all groups matched (i.e., searching matched strings). For compiling the regex, it should happen in Regex::new (this is where we catch compilation error) which is relatively faster (0.3%).

Ah, that makes sense (that captures_at is the actual matching). Maybe pre-compiling won't get us as much afterall

@viirya
Copy link
Member Author

viirya commented Dec 23, 2023

It looks like regexp_replace has the same issue with cloning though.

Yea, just cleaned clone there too.

@viirya viirya merged commit 72af0ff into apache:main Dec 24, 2023
22 checks passed
@viirya
Copy link
Member Author

viirya commented Dec 24, 2023

Thanks @alamb @comphead @Dandandan

@ozankabak
Copy link
Contributor

This is great progress. How do we get from once-per-batch to once-per-plan?

@alamb
Copy link
Contributor

alamb commented Dec 24, 2023

This is great progress. How do we get from once-per-batch to once-per-plan?

One way would be #8051

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reported very slow performance compared to DuckDB in ibis-project
5 participants