Skip to content

Commit

Permalink
[SPARK-35801][SQL] Add connector APIs for row-level operations (apach…
Browse files Browse the repository at this point in the history
…e#1228)

### What changes were proposed in this pull request?

This PR adds connector APIs for DELETE/UPDATE/MERGE operations. It is similar to what we had in 3.1 but with changes to support merge-on-read per [this](https://docs.google.com/document/d/12Ywmc47j3l2WF4anG5vL4qlrhT2OKigb7_EbIKhxg60/) design doc.

### Why are the changes needed?

These changes are needed to support DELETE/UPDATE/MERGE operations.

### Does this PR introduce _any_ user-facing change?

This PR adds isolated connector APIs that will be used only by Iceberg.

### How was this patch tested?

Local testing.
  • Loading branch information
aokolnychyi authored and GitHub Enterprise committed Oct 8, 2021
1 parent 553544a commit 51037e2
Show file tree
Hide file tree
Showing 13 changed files with 477 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.write.RowLevelOperationBuilder;
import org.apache.spark.sql.connector.write.RowLevelOperation;
import org.apache.spark.sql.connector.write.RowLevelOperationInfo;

/**
* A mix-in interface for {@link Table} row-level operations support. Data sources can implement
* this interface to indicate they support rewriting data for DELETE, UPDATE, MERGE operations.
*
* @since 3.3.0
*/
@Experimental
public interface SupportsRowLevelOperations extends Table {
/**
* Returns a {@link RowLevelOperationBuilder} to build a {@link RowLevelOperation}.
* Spark will call this method while planning DELETE, UPDATE and MERGE operations.
*
* @param info the row-level operation info such command (e.g. DELETE) and options
* @return the row-level operation builder
*/
RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInfo info);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;

/**
* An interface that defines how to write a delta of rows during batch processing.
*
* @since 3.3.0
*/
@Experimental
public interface DeltaBatchWrite extends BatchWrite {
@Override
DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;

/**
* A logical representation of a data source write that handles a delta of rows.
*
* @since 3.3.0
*/
@Experimental
public interface DeltaWrite extends Write {
DeltaBatchWrite toBatch();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;

/**
* An interface for building a {@link DeltaWrite}.
*
* @since 3.3.0
*/
@Experimental
public interface DeltaWriteBuilder extends WriteBuilder {

/**
* Returns a logical {@link DeltaWrite}.
*/
@Override
default DeltaWrite build() {
throw new UnsupportedOperationException("Not implemented: build");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import java.io.IOException;
import org.apache.spark.annotation.Experimental;

/**
* A data writer returned by {@link DeltaWriterFactory#createWriter(int, long)} and is
* responsible for writing a delta of rows.
*
* @since 3.3.0
*/
@Experimental
public interface DeltaWriter<T> extends DataWriter<T> {
/**
* Passes information for a row that must be deleted.
*
* @param metadata values for metadata columns that were projected but are not part of the row ID
* @param id a row ID to delete
* @throws IOException if the write process encounters an error
*/
void delete(T metadata, T id) throws IOException;

/**
* Passes information for a row that must be updated together with the updated row.
*
* @param metadata values for metadata columns that were projected but are not part of the row ID
* @param id a row ID to update
* @param row a row with updated values
* @throws IOException if the write process encounters an error
*/
void update(T metadata, T id, T row) throws IOException;

/**
* Passes a row to insert.
*
* @param row a row to insert
* @throws IOException if the write process encounters an error
*/
void insert(T row) throws IOException;

@Override
default void write(T row) throws IOException {
insert(row);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;

/**
* A factory for creating {@link DeltaWriter}s returned by
* {@link DeltaBatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which is responsible for
* creating and initializing writers at the executor side.
*
* @since 3.3.0
*/
@Experimental
public interface DeltaWriterFactory extends DataWriterFactory {
DeltaWriter<InternalRow> createWriter(int partitionId, long taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ public interface LogicalWriteInfo {
* the schema of the input data from Spark to data source.
*/
StructType schema();

/**
* the schema of the input metadata from Spark to data source.
*/
StructType metadataSchema();

/**
* the schema of the ID columns from Spark to data source.
*/
StructType rowIdSchema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A logical representation of a data source DELETE, UPDATE, or MERGE operation that requires
* rewriting data.
*
* @since 3.3.0
*/
@Experimental
public interface RowLevelOperation {

/**
* The actual SQL operation being performed.
*/
enum Command {
DELETE, UPDATE, MERGE
}

/**
* Returns the description associated with this row-level operation.
*/
default String description() {
return this.getClass().toString();
}

/**
* Returns the actual SQL operation being performed.
*/
Command command();

/**
* Returns a {@link ScanBuilder} to configure a {@link Scan} for this row-level operation.
* <p>
* Sources fall into two categories: those that can handle a delta of rows and those that need
* to replace groups (e.g. partitions, files). Sources that handle deltas allow Spark to quickly
* discard unchanged rows and have no requirements for input scans. Sources that replace groups
* of rows can discard deleted rows but need to keep unchanged rows to be passed back into
* the source. This means that scans for such data data sources must produce all rows in a group
* if any are returned. Some sources will avoid pushing filters into files (file granularity),
* while others will avoid pruning files within a partition (partition granularity).
* <p>
* For example, if a source can only replace partitions, all rows from a partition must
* be returned by the scan, even if a filter can narrow the set of changes to a single file
* in the partition. Similarly, a source that can swap individual files must produce all rows
* of files where at least one record must be changed, not just the rows that must be changed.
*/
ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);

/**
* Returns a {@link WriteBuilder} to configure a {@link Write} for this row-level operation.
* <p>
* Note that Spark will first configure the scan and then the write, allowing data sources
* to pass information from the scan to the write. For example, the scan can report
* which condition was used to read the data that may be needed by the write under certain
* isolation levels.
*/
WriteBuilder newWriteBuilder(LogicalWriteInfo info);

/**
* Returns metadata attributes that are required to perform this row-level operation.
* <p>
* Data sources that can use this method to project metadata columns needed for writing
* the data back (e.g. metadata columns for grouping data).
*/
default NamedReference[] requiredMetadataAttributes() {
return new NamedReference[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;

/**
* An interface for building a {@link RowLevelOperation}.
*
* @since 3.3.0
*/
@Experimental
public interface RowLevelOperationBuilder {
/**
* Returns a {@link RowLevelOperation} that controls how Spark handles operations that require
* rewriting data such as DELETE, UPDATE, MERGE.
*/
RowLevelOperation build();
}
Loading

0 comments on commit 51037e2

Please sign in to comment.