Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set target_partitions on table scan in physical planner #972

Closed
wants to merge 3 commits into from

Conversation

xudong963
Copy link
Member

Which issue does this PR close?

Closes #708

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@xudong963
Copy link
Member Author

@houqp PTAL, thanks!

@houqp houqp requested review from houqp, andygrove and alamb September 12, 2021 19:45
@houqp houqp changed the title remove hard_coded max_partitions set max_partitions on table scan in physical planner Sep 12, 2021
@houqp houqp added the api change Changes the API exposed to users of the crate label Sep 12, 2021
@houqp
Copy link
Member

houqp commented Sep 12, 2021

cc @alamb since will impact iox.

@Dandandan
Copy link
Contributor

I am not sure if this is the way we should go.

I think this should be a setting which could be set on the session to limit the number of partitions in each execution node. This property should be used throughout the code, instead of using the number of CPU cores.

It's OK to use the number of CPU cores by default in DataFusion (as is the case currently) but it should be possible to override this, e.g. from Ballista.

@houqp
Copy link
Member

houqp commented Sep 12, 2021

@Dandandan if I read the code correctly, it's only using cpu cores in tests. For actual execution, the partition count is read from the context config in physical planner when creating the table scan node:

let max_partitions = ctx_state.config.target_partitions;

@@ -115,7 +115,7 @@ impl MemTable {
output_partitions: Option<usize>,
) -> Result<Self> {
let schema = t.schema();
let exec = t.scan(&None, batch_size, &[], None)?;
let exec = t.scan(&None, batch_size, &[], None, num_cpus::get())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not use the number of cpus here, but the default nr of partitions.

Copy link
Member Author

@xudong963 xudong963 Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I read the code correctly, MemTable::load only is used by bench. What's more, target_partitions in MemTable::scan is unused. cc @houqp

Copy link
Member

@houqp houqp Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Dandandan on this one. Even though MemTable::scan is not using target_partitions, the passed in table provider t's scan method might actually use it. Given load is defined as a pub method, I am concerned that there are downstream code from other projects calling this method already. If it's a private method only used in test, then I think it would be fine to hard code the partition count to cpu cores here.

In short, I recommend adding the partition value as an argument to the load method and let caller handle the value resolution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it!

@Dandandan
Copy link
Contributor

@Dandandan if I read the code correctly, it's only using cpu cores in tests. For actual execution, the partition count is read from the context config in physical planner when creating the table scan node:

let max_partitions = ctx_state.config.target_partitions;

Thanks, you're right. I skimmed the PR too quickly.

Two suggestions I have for the PR:

  • call it target_partitions instead of max_partitions
  • use the target_partitions for MemTable as well (instead of using nr of cpu cores)

@Dandandan Dandandan changed the title set max_partitions on table scan in physical planner set target_partitions on table scan in physical planner Sep 13, 2021
@Dandandan Dandandan changed the title set target_partitions on table scan in physical planner Set target_partitions on table scan in physical planner Sep 13, 2021
@rdettai
Copy link
Contributor

rdettai commented Sep 13, 2021

Thanks @xudong963 for your work!

@Dandandan for me target_partitions and max_partitions are two different things:

  • More and more distributed engines (especially in the cloud) are able to provision resources on demand according to the request. In that case, the total number of cores is not predefined. We don't want the TableProvider to try to give a defined number of partitions, but instead we want it to have its own partitioning strategy according to the data it has, and the scheduler will adapt the resources it spawns accordingly (up to a given limit that would be the max_partitions).
  • On the other hand, on a single node engine or a preallocated cluster, we have a fixed amount of resources that we want to use to the best, so we want our TableProvider to try to do everything it can to reach that partitioning.

You can find an interesting reference on datasource partitioning in Spark according to configs here. Here is a summary of some interesting points:

  • different sources have different parameters: some have min_partitions, some pick up the number of cores, some use spark.default.parallelism...
  • personally, I find it kind of counter intuitive that the Spark Parquet reader will pick up spark.default.parallelism, which is a global boot time config, and try to break up the file to have a number of partitions equal to that parallelism...

My conclusion would be the following:

  • keep the partitioning configurations at the TableProvider level (you define them when you instantiate the datasource), to allow different implementations to have different options.
    • For example Parquet can be split by row groups
    • CSV files do not have row groups and we need to specify byte sizes to split the files
    • Some TableProviders might have a partition abstraction that we could use as parallelism.
    • Others TableProviders (like Iceberg) will have well defined row counts for each file, so we could ask them to split directly by row counts instead of byte sizes or numbers of row groups

Whatever decision we make, I have a final remark, mostly about code hygiene: the scan() method is getting bloated as we add more and more parameters. If we decide to follow the direction this PR is currently taking (passing the parameter in the scan() method), I think we should build a ScanConfigs struct to gather options such as batch_size and target_partitions.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @xudong963

I looked at the code and I see that adding target_partitions to scan is basically following the same pattern as options such asbatch_size and thus I think it is a reasonable thing to do.

I think @rdettai also has some good points which I will address as a second comment

true,
)?;
LogicalPlanBuilder::scan(
&scan.table_name,
Arc::new(parquet_table),
projection,
)? //TODO remove hard-coded max_partitions
)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@alamb
Copy link
Contributor

alamb commented Sep 13, 2021

@rdettai

More and more distributed engines (especially in the cloud) are able to provision resources on demand according to the request. In that case, the total number of cores is not predefined. We don't want the TableProvider to try to give a defined number of partitions, but instead we want it to have its own partitioning strategy according to the data it has...

I think of target_partitions as "target concurrency" (aka how many cores could you possibly keep busy at any time) and I think like you believe that the decisions of how to actually split up data across the partitions exposed to DataFusion should be an implementation detail of the TableProvider.

So a system that uses DataFusion would set target_partitions based on how many CPU resources it wanted DataFusion to try and consume. Various TableProvider implementations could take that target into advisement as they divided up their data.

My conclusion would be the following:

keep the partitioning configurations at the TableProvider level (you define them when you instantiate the datasource), to allow different implementations to have different options.

I think I would phrase it differently as "keep the choice of distribution of data across partitions at the TableProvider level"

@alamb
Copy link
Contributor

alamb commented Sep 13, 2021

Whatever decision we make, I have a final remark, mostly about code hygiene: the scan() method is getting bloated as we add more and more parameters. If we decide to follow the direction this PR is currently taking (passing the parameter in the scan() method), I think we should build a ScanConfigs struct to gather options such as batch_size and target_partitions.

100% agree -- can you file a ticket for this? It is likely a fairly straightforward mechanical change that might be a good project for a new contributor

@houqp
Copy link
Member

houqp commented Sep 13, 2021

Thanks @rdettai for the detailed write up. Based on how max_partitions is used in the parquet scan, I think target_partitions is indeed a more fitting name.

@xudong963
Copy link
Member Author

Whatever decision we make, I have a final remark, mostly about code hygiene: the scan() method is getting bloated as we add more and more parameters. If we decide to follow the direction this PR is currently taking (passing the parameter in the scan() method), I think we should build a ScanConfigs struct to gather options such as batch_size and target_partitions.

100% agree -- can you file a ticket for this? It is likely a fairly straightforward mechanical change that might be a good project for a new contributor

Yes, I will do it!

@xudong963
Copy link
Member Author

Fixed, PTAL~

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rdettai
Copy link
Contributor

rdettai commented Sep 14, 2021

Thanks @alamb and @houqp for your insights and @xudong963 for quickly reacting to this feedback! But I am still not 100% convinced 😃

  • in the context of an engine like Buzz, where the number of CPUs is meant to be fully elastic, I would prefer to specify a partition size and no target count. I understand that adding target_parition_size could be an evolution, but it bothers me that target_partitions is not optional because I wouldn't know what to specify for it
  • Spark currently accepts that no parallelism is hinted to the datasource, and in that case the datasource comes up with a partition count of its own. I find this behavior intuitive but it might be because I have been educated to do so 😄

I think of target_partitions as "target concurrency"

  • I would say that there isn't a 1 to 1 equivalence between parallelism and partition number. Usually, the partition number can be much larger than the parallelism and tasks for extra partitions will be queued. So if we mean to hint a "target concurrency" to the table providers, I think we should name this configuration as such.
  • this is a personal opinion, but I am usually septic of global parameters that are meant to be interpreted differently by different implementations

@yjshen
Copy link
Member

yjshen commented Sep 14, 2021

Sorry for not participating in the discussion on the relationship among parallelism, number of partitions, target concurrencies, target partition size, etc., at an early stage. I was intended to keeping the original term max_concurrency in DataFusion from my first review.

Since we are getting more on a generalized design discussion here, I want to share my understandings:

My core idea is to partition the dataset referred to in each query based on required size in bytes, i.e., our users should set the desired partition size in bytes (or tune the partition size parameter step by step). Or in other words, users choose execution granularity based on the amount of work performed by each task. This has several implications:

  • We engine could adaptively change table processing speed by schedule more or fewer tasks to run concurrently, based on the current payload of the system or a more complex operator DAG scheduling policy.
  • It should be CBO's responsibility to decide the partition number instead of at the level of TableProvider. I suppose the partition number for scanning each table should depend highly on the real data size involved in a query. It's meaningless to divide a table with the same partition number for both select count(distinct a) from table1 and select a, b, c .... f from table1.
  • Users could provide more cores when anticipating an execution speed up during a period of time, known as elasticity, I think. Ballista is more likely to get this benefit.

Therefore, I like to haveSourceConfig contain target_partition_size and target_partition_num, which CBO can mutate or replace later in a *ScanExec node. And always prefer target_partition_size when statistics are available if stored in the catalog or lightweight enough to get. target_partition_num could be left out, default to num_cpus, for example. When the input table is small enough, no bother to care about the partition strategy then.

Besides, I'm aware that the scope is too much for this PR. Stop where it's appropriate, and we could move sophisticated later.

@xudong963
Copy link
Member Author

Learn much from this pr.
Thanks!

@alamb
Copy link
Contributor

alamb commented Sep 14, 2021

@rdettai -- this is a good discussion:

I think one core challenge in our understanding is that the term partition is overloaded with at least two meanings:

  1. A portion of the data (e.g. a file) that has individual statistics and might be read / accessed differently
  2. the partition output of a DataFusion ExecutionPlan which is more like a portion of the operator's output that can be read in parallel

I think the parameter in this PR is referring to the second, even though the first would likely be a better direction to take DataFusion in the longer term. Perhaps related to #64

in the context of an engine like Buzz, where the number of CPUs is meant to be fully elastic, I would prefer to specify a partition size and no target count.

I can see how specifying the size of each output stream may make sense

Spark currently accepts that no parallelism is hinted to the datasource, and in that case the datasource comes up with a partition count of its own. I find this behavior intuitive but it might be because I have been educated to do so 😄

I think it also may be related to Spark's scheduler which I think can control the number of concurrently "active" partitions. DataFusion just starts them all at once.

I would say that there isn't a 1 to 1 equivalence between parallelism and partition number.

I agree

@alamb
Copy link
Contributor

alamb commented Sep 14, 2021

@yjshen

My core idea is to partition the dataset referred to in each query based on required size in bytes,

I think that is a reasonable idea too. The batch_size hit passed to scan may be a proxy for this (as it is supposed to control how large the individual batch sizes are)

We engine could adaptively change table processing speed by schedule more or fewer tasks to run concurrently, based on the current payload of the system or a more complex operator DAG scheduling policy.

Yes, I am very much on board with this idea (related to #64 I think)

All in all I think improving the abstractions around "partitions" in datafusion and decoupling the execution concurrency from the number of distinct data streams, would be very helpful and a good direction to head. I think this is aligned too with what @rdettai is saying as well

@rdettai
Copy link
Contributor

rdettai commented Sep 15, 2021

I think it also may be related to Spark's scheduler which I think can control the number of concurrently "active" partitions. DataFusion just starts them all at once.

In my mind, DataFusion already supports any form of scheduling of the partitions. It is just the DefaultPhysicalPlanner and the datafusion::physical_plan::collect(ExecutionPlan) helper that don't allow it. As a library user, the physical plan stage is just as much a public API, and thanks to the ExecutionPlan.execute(partition_id) method being at the partition level, you have a lot of freedom on scheduling. Ballista is a great example for this, with different partitions being packaged into tasks and and picked up by different executors. Of course, Ballista required a different planner, or at least to tweak the default plan a bit to handle shuffle boundaries, but it seems to me that this is a "supported" (term to be defined 😄) way to use DataFusion.

one core challenge in our understanding is that the term partition is overloaded with at least two meanings

I do think that we agree that whatever layout we have for the data in the storage, we can map it to any number of partition in the ExecutionPlan. And we all agree that this mapping should be tunable because for the same dataset, different compute hardware or engines using DataFusion will perform differently according to how this mapping is performed. I think the challenge in our understanding is more about how the ExecutionPlan partitioning is bound to the engine parallelism. If there is a 1 to 1 mapping today because of the ExecutionPlan proposed by the current DefaultPhysicalPlanner, that is not a structural behavior of DataFusion. This is just the current default implementation (with #64 proposing to change that). But here, we are changing the API of the TableProvider/LogicalPlan, and the decisions we take while doing so should at least accommodate the most classical implementations for the planner. We, Ballista (which, by the way, is where the original hardcoded value #708 took place) and #64 all agree that having a parallelism decoupled from the number of partitions is a valid usecase and planning strategy.

So what does target_partitions mean:

  • it does not mean "target concurrency", because the execution might use a scheduler that runs only n out of m partitions at a time (and it does in Ballista)
  • does it mean that the table provider should do its best to output that number of partition? that is a setting that fits the case where the execution maps parallelism to partitions 1 to 1, but in general it will not scale well:
    • if a query scans a tiny amount of data we will be forcing the table provider to split the data, thus add overhead, for nothing
    • if a query scans a huge amount of data, our partitions will be huge which might lead to more memory pressure

Proposed outcome:

If there is too much ambiguity around the target_partitions (or max_partitions) parameter (and I feel there is), my advice would be to leave it configured at the ParquetTable level for now. We could simply serialize it in the ParquetTableScanNode to fix the hardcoding issue in from_proto. This way we avoid an API change in the TableProvider trait, but we still fix #708. We could then migrate these discussions into a design document that would try to come up with the right partitioning tuning configurations.

@xudong963
Copy link
Member Author

xudong963 commented Sep 17, 2021

@houqp @alamb @rdettai Maybe we should continue to discuss, then make a decision about the pr to avoid futile fixes. 😊

@alamb
Copy link
Contributor

alamb commented Sep 18, 2021

@rdettai how opposed are you to this PR's change?

What do you think we should do with this PR?

I agree there are several problems with DataFusion's use of target_partitions but I don't think this PR makes them any worse -- it simply moves the problem around and is more consistent with the batch_size argument.

@houqp
Copy link
Member

houqp commented Sep 19, 2021

Sorry for the delay, still trying to catch up with all the interesting design discussions across different PRs. It looks like we all agree that it's better to drive the partitioning logic in table scan using target_partition_size (or max_partition_size) config.

For this particular PR, I agree with @alamb that it is an improvement and doesn't make anything worse. I think the newly added ScanConfig also lays out a good foundation for us to add more scan config in the future including target_partition_size and potentially remove target_partitions.

I also agree with @rdettai that we should avoid unnecessary API churn if possible. However, I do think the table scan method is the right place for us to pass in the target_partition_size config when we add it in the future. @rdettai do you envision us having to pass in the target_partition_size config at a different layer? If so, I can see the value of avoiding the API change in the TableProvider trait.

@xudong963
Copy link
Member Author

conflict fixed

@rdettai
Copy link
Contributor

rdettai commented Sep 20, 2021

Thanks for your feedback.

  • @alamb My personal feeling is that this is not the right direction to take, my vote would be a -0.9 Apache style.
  • @houqp I would prefer to have all the configurations at the TableProvider constructor level. I really find that different datasources will have specificity in the kind of configuration they support and how they behave with it.

Having specific configurations for each kind of datasource would also have another great benefit: we could provide much better API documentation. Currently, if someone specifies ExecutionConfig.target_partition_size, he would need to check a separate documentation (or even check the code) to know how different datasources use this config. If we have something like ExecutionConfig.parquet.target_partition_size, we can directly document how this works in the Parquet TableProvider. For instance for target_partition_size, we could document what size we are referring to (before/after uncompression...).

@houqp
Copy link
Member

houqp commented Sep 21, 2021

Thanks @rdettai for the clarification. After thinking more on this, I agree with your conclusion that specifying these configs at the TableProvider constructor level would be a more flexible design. But for a different reason ;)

With regards to table specific configs like ExecutionConfig.parquet.target_partition_size I think we can still get it work by passing ExecutionConfig to the scan method and let individual implementation pick which config value to read from.

But passing in scan configuration at the TableProvider constructor level would effectively make these configs accessible from a larger portion of the query execution process. If we only pass them in to the scan method, then it will only be accessible starting from the physical planner. So I think it does give us more room for optimization in the future. Although I can't think of any specific optimization we could do with this info during logical planning.

Just to make sure we are on the same page here, the proposed short term design looks like the following:

  • Keep the TableProvider::scan API and ParquetTable::try_new API the same as what they are in the master branch right now.
  • Store target_partitions config in ParquetTable struct so it can be accessed later during ParquetTable::scan call.
  • Serialize target_partitions config in ballista protobuf to remove the hard coded value.

Then we can work on follow up PRs to migrate the code base over to center the scanning logic around partition size instead of partition count.

NOTE: with regards to file format specific scan config, I think we might need to come up with a more dynamic design instead of using static fields in ExecutionConfig struct, otherwise it would make it very hard for people to add custom formats as plugins.

@xudong963
Copy link
Member Author

@houqp @alamb @rdettai I will rethink comments and iterate the PR later. Now I'm caught in "996" 😭

@houqp
Copy link
Member

houqp commented Sep 22, 2021

no worry, take your time @xudong963 , if you need any help, feel free to ping us here.

@xudong963
Copy link
Member Author

To avoid trouble fixes, I start a new PR #1044
PTAL, thanks @houqp @rdettai @alamb

@rdettai
Copy link
Contributor

rdettai commented Sep 23, 2021

feel free to close this one 😉

@xudong963 xudong963 closed this Sep 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ballista: Remove hard-coded concurrency from logical plan serde code
6 participants