Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rewrite operations #852

Merged
merged 22 commits into from
Nov 16, 2022
Merged

feat: rewrite operations #852

merged 22 commits into from
Nov 16, 2022

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Sep 27, 2022

Description

This PR incorporates some of the learnings with regards to how datafusion "should" be used (or what I think I understood so far how it should be used) and how this applies to our operations module. It also embraces the IntoFuture trait stabilized in rust 1.64.

More specifically:

  • Model operations as builders that implement IntoFuture
  • While we figure out how to deal with state sharing - just consume the table as it needs to be updated anyhow (right?)
  • push datafusion dependencies into command implementations, so we can more easily extend to commands that do not require datafuison.
  • have command-spcecific errors that map to DeltaTableError (the top level error variants likely have to be refined, for now, many command errors map to a GenericError, which at least shows meaningful error messages.)
  • an updated PartitionWriter implementation that allows more fine graunlar control on how data is written

Some of choices or trials here mainly (hopefully) make sense when viewed as preparation for what's to come.

  • For the conflict resolution (feat: optimistic transaction protocol #632), we want to leverage some information on especially the scanned files during an operation. The plan is to leverage the execution metrics from the datafusion plans for this. We already do this in some of the datafusion tests, to make sure we properly use the file metrics in scans.
  • The new partition writer implementation is kept separate from the existing one for now, to be able to more easily iterate.
  • I replicated a minimal implementation of the transactions, but the main update for this will follow with conflict resolution.

To keep it somewhat simpler to review I tried to keep the major changes contained in the operations module. If we adopt this the idea is to migrate all existing operations (create, vacuum, optimize) to the builder pattern and into the operations module, and have the methods on DeltaTable just return a pre-populated builder. I think this is where IntoFuture shines, as we can await them just like before.

Implementing this, one of the things I found most convenient, is that we now have the memory:// store available. Coupled with the sync_stores helper, it makes setting up and validating test cases that mutate an existing table very convenient - yay object_store 😆.

There are some things I want to clean up, but it would be great to get some feedback on if this is where we want to go - @houqp @wjones127 @fvaleye.

Related Issue(s)

Documentation

)
};

// TODO configure more permissive versions based on configuration. Also how should this ideally be handled?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should write a function that looks at a list of actions and detects which features are used, then determine the protocol versions. It will be much easier if that logic is the responsibility of a single function than spread out.

I think for updates we only need to run that function on new actions and merge the result with the existing protocol versions on the table. There's probably also some concurrency resolution that needs to happen too, but I haven't yet thought about that part much..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might even say we should let clients set the protocol; we should consider that the responsibility of the library. What do you think? Are there use cases where they should set it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right - having the user choose the protocol and defaulting to "what spark uses" seems like the way to go. WE may in that case have to check option compatibility though?

if table.object_store().is_delta_table_location().await? {
match mode {
SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),
SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is append not allowed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's what spark does :)

SaveMode::Overwrite => {
let curr_files =
flatten_list_stream(table.object_store().as_ref(), None).await?;
table.object_store().delete_batch(&curr_files).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the current files?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well .. in this case I was not sure, but conceptually I felt we are not overwriting the data in a table and creating a new version, but creating an entirely new table at version 0. If we were to support also updating table metadata, schema etc via this route, I guess there is more work to be done here validating all changes.

But this one I was definitely unsure about.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a look at the spark implementation - seems they agree with you and updating the metadata and evoling the table is the way to go

https://github.com/delta-io/delta/blob/1f6ab824e14794c17202b5e4e5df6a95357a799c/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala#L196-L208

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the metadata update is a larger operation, I opted for raising not implemented for now in this PR. I opened #917 to track this.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like these new Builder APIs so far.

I do still like the idea of a having a three-tier API: (action-based, engine-agnostic, DataFusion-based). I think for clarity, it would be best to have those in separate modules of the crate. So, for example, I don't think the Create command and the Load/Write commands belong in the same module, since the first you pass in actions (low-level) while the others are more high-level and deal with data. I'll think about this some more though, since I'm not 100% sure this makes sense.

@wjones127
Copy link
Collaborator

So, for example, I don't think the Create command and the Load/Write commands belong in the same module, since the first you pass in actions (low-level) while the others are more high-level and deal with data. I'll think about this some more though, since I'm not 100% sure this makes sense.

Okay I think I mis-read the purpose of Create: it's not a low-level API, it's just to create a table, so there's isn't any data writing interaction involved. So we should just think of operations module as the DataFusion-based high-level API. 👍

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only halfway through and will try to wrap up reviewing tomorrow. Have some initial comments / suggestions.

rust/src/operations/create.rs Outdated Show resolved Hide resolved
rust/src/operations/create.rs Outdated Show resolved Hide resolved
rust/src/operations/create.rs Outdated Show resolved Hide resolved
rust/src/operations/create.rs Outdated Show resolved Hide resolved
rust/src/operations/create.rs Show resolved Hide resolved
rust/src/operations/load.rs Outdated Show resolved Hide resolved
/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
/// Using this will not persist any changes beyond the lifetime of the table object.
/// THe main purpose of in-memory tables is for use in testing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// THe main purpose of in-memory tables is for use in testing.
/// The main purpose of in-memory tables is for use in testing.

/// let ops = DeltaOps::new_in_memory();
/// ```
#[must_use]
pub fn new_in_memory() -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool!

///
/// let ops = DeltaOps::new_in_memory();
/// ```
#[must_use]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new to me. Why #[must_use]?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also only fairly recently learned about must use. The idea is, if you do not consume the result of this call, clippy (or even the compiler) will complain. which to me makes sense, since if not using the return it literally does nothing and should be removed.

I think futures for instance may also be must_use since if not consumed they also so nothing ...

#[error("Tried committing existing table version: {0}")]
VersionAlreadyExists(DeltaDataTypeVersion),

/// Error returned when reading the delta log object failed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this description accurate? It's duplicated below, and reading and serializing seem like opposites.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished looking through, and few more comments mostly around clean up and follow up issues.

/// Low-level transaction API. Creates a temporary commit file. Once created,
/// the transaction object could be dropped and the actual commit could be executed
/// with `DeltaTable.try_commit_transaction`.
async fn prepare_commit(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to replace DeltaTransaction::PrepareCommit? Or is it different?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is correct. I wanted to iterate a bit more on commits (as well as the writer) before adopting it in the main code paths. Next I wanted to finally look into the conflict resolution again, where I expect more changes to the commit behavior.

rust/src/operations/write.rs Outdated Show resolved Hide resolved
}
}

impl ExecutionPlan for WriteCommand {
fn as_any(&self) -> &dyn Any {
impl WriteBuilder {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also allow passing the Parquet WriterProperties into this builder? I think users would want to be able to control the max_row_group_size and other options from there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth a bit on this. If we eventually want to have support for both parquet implementations, maybe we should not expose the specific option structs directly, on the other hand there is more options to consider...

rust/src/operations/write.rs Show resolved Hide resolved
Comment on lines +271 to +272
if batches.is_empty() {
Err(WriteError::MissingData)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead early return if there is no data?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess ... :). if we do we need to either make it a no-op or add some logic to handle the case when we do not have an explicitly defined table schema, in case we need to create the table.

rust/src/operations/writer.rs Outdated Show resolved Hide resolved
}?;

let plan = if let Some(plan) = this.input {
Ok(plan)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the plan guaranteed to be partitioned correctly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it is now ... or rather I did think about wrapping that, but this moved to the writer now ... I'll check more explicitly that we handle this correctly.

rust/src/operations/writer.rs Outdated Show resolved Hide resolved
rust/src/operations/writer.rs Outdated Show resolved Hide resolved
Ok(_) => Ok(true),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err),
// TODO We should really be using HEAD here, but this fails in windows tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you created a ticket for this in object-store? I can look into this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I have not yet - Windows gives a permission denied error, and I wanted to investigate if that is a general / expected behavior. At least in principle there should not be any permission issues, since the tests run in a generated temp folder...

Comment on lines -12 to -13
[profile.dev]
split-debuginfo = "unpacked"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure why why had this option. however it seems it is only stable on macOS, and was causing build issues for Windows after 1,65 was released, so I removed it.

@roeap roeap mentioned this pull request Nov 10, 2022
wjones127
wjones127 previously approved these changes Nov 11, 2022
@roeap
Copy link
Collaborator Author

roeap commented Nov 15, 2022

@wjones127 - sorry for letting this sit for so long. Did some minor tweaks, mainly related to so deprecations in latest chrono and resolved some conflicts with main.

could you re-approve? :)

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. Thanks for coming back to it. I'm excited to get this merged :)

@roeap roeap merged commit e72cdfe into delta-io:main Nov 16, 2022
@roeap roeap deleted the operations branch November 16, 2022 05:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants