Skip to content

Commit

Permalink
Merge pull request #586 from splitgraph/increase-sync-len-threshold
Browse files Browse the repository at this point in the history
Patch DF to circumvent an analyzer rule
  • Loading branch information
gruuya authored Aug 2, 2024
2 parents 82a9ca4 + 5128975 commit ccacfe0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 37 deletions.
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ url = "2.5"


[patch.crates-io]
# Pick up fix for https://github.com/apache/arrow-datafusion/pull/11386
datafusion = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-proto = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-sql = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
# Pick up fix for https://github.com/apache/arrow-datafusion/pull/11386 and backport for https://github.com/apache/datafusion/pull/11765
datafusion = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-common = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-execution = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-optimizer = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-physical-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-physical-plan = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-proto = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-sql = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }

[package]
name = "seafowl"
Expand Down
2 changes: 1 addition & 1 deletion src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl Default for DataSyncConfig {
Self {
max_in_memory_bytes: 3 * 1024 * 1024 * 1024,
max_replication_lag_s: 600,
max_syncs_per_url: 50,
max_syncs_per_url: 100,
write_lock_timeout_s: 3,
flush_task_interval_s: 900,
}
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ impl SeafowlDataSyncWriter {
}
};

info!("Flushing {} syncs for url {url}", entry.syncs.len());

let start = Instant::now();
let insertion_time = entry.insertion_time;
let rows = entry.rows;
Expand Down Expand Up @@ -384,7 +386,14 @@ impl SeafowlDataSyncWriter {
vec![qualifier.clone()],
)?
.build()?;
let mut sync_df = DataFrame::new(self.context.inner.state(), base_plan);

let state = self
.context
.inner
.state()
.with_analyzer_rules(vec![])
.with_optimizer_rules(vec![]);
let mut sync_df = DataFrame::new(state, base_plan);

for sync in &entry.syncs {
sync_df = self.apply_sync(
Expand Down

0 comments on commit ccacfe0

Please sign in to comment.