Skip to content

Commit

Permalink
Rust API documentation
Browse files Browse the repository at this point in the history
Signed-off-by: Abdullahsab3 <[email protected]>
  • Loading branch information
Abdullahsab3 authored and rtyler committed Jan 4, 2025
1 parent 6430151 commit d851959
Show file tree
Hide file tree
Showing 13 changed files with 966 additions and 314 deletions.
10 changes: 6 additions & 4 deletions docs/src/rust/check_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// --8<-- [start:add_constraint]
let table = deltalake::open_table("../rust/tests/data/simple_table").await?;
let ops = DeltaOps(table);
ops.with_constraint("id_gt_0", "id > 0").await?;
ops.add_constraint().with_constraint("id_gt_0", "id > 0").await?;
// --8<-- [end:add_constraint]

// --8<-- [start:add_data]
let table = deltalake::open_table("../rust/tests/data/simple_table").await?;
let schema = table.get_state().arrow_schema()?;
let mut table = deltalake::open_table("../rust/tests/data/simple_table").await?;
let schema = table.snapshot()?.arrow_schema()?;
let invalid_values: Vec<Arc<dyn Array>> = vec![
Arc::new(Int32Array::from(vec![-10]))
];
let batch = RecordBatch::try_new(schema, invalid_values)?;
table.write(vec![batch]).await?;
let mut writer = RecordBatchWriter::for_table(&table)?;
writer.write(batch).await?;
writer.flush_and_commit(&mut table).await?;
// --8<-- [end:add_data]

Ok(())
Expand Down
30 changes: 26 additions & 4 deletions docs/src/rust/read_cdf.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

let table = deltalake::open_table("../rust/tests/data/cdf-table").await?;
let table = deltalake::open_table("tmp/some-table").await?;
let ctx = SessionContext::new();
let ops = DeltaOps(table);
let cdf = ops.load_cdf()
let cdf = ops
.load_cdf()
.with_starting_version(0)
.with_ending_version(4)
.build()
.await?;

arrow_cast::pretty::print_batches(&cdf)?;
let batches = collect_batches(
cdf.properties().output_partitioning().partition_count(),
&cdf,
ctx,
).await?;
arrow_cast::pretty::print_batches(&batches)?;


Ok(())
}
}

async fn collect_batches(
num_partitions: usize,
stream: &impl ExecutionPlan,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
let mut batches = vec![];
for p in 0..num_partitions {
let data: Vec<RecordBatch> =
collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
batches.extend_from_slice(&data);
}
Ok(batches)
}
72 changes: 58 additions & 14 deletions docs/usage/appending-overwriting-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,30 @@ Suppose you have a Delta table with the following contents:

Append two additional rows of data to the table:

```python
from deltalake import write_deltalake, DeltaTable

df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]})
write_deltalake("tmp/some-table", df, mode="append")
```
=== "Python"

```python
from deltalake import write_deltalake, DeltaTable

df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]})
write_deltalake("tmp/some-table", df, mode="append")
```

=== "Rust"
```rust
let table = open_table("tmp/some-table").await?;
DeltaOps(table).write(RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, false),
Field::new("letter", DataType::Utf8, false),
])),
vec![
Arc::new(Int32Array::from(vec![8, 9])),
Arc::new(StringArray::from(vec![
"dd", "ee"
])),
])).with_save_mode(SaveMode::Append).await?;
```

Here are the updated contents of the Delta table:

Expand All @@ -44,12 +62,27 @@ Now let's see how to perform an overwrite transaction.
## Delta Lake overwrite transactions

Now let's see how to overwrite the exisitng Delta table.

```python
df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]})
write_deltalake("tmp/some-table", df, mode="overwrite")
```

=== "Python"
```python
df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]})
write_deltalake("tmp/some-table", df, mode="overwrite")
```

=== "Rust"
```rust
let table = open_table("tmp/some-table").await?;
DeltaOps(table).write(RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, false),
Field::new("letter", DataType::Utf8, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a", "b", "c",
])),
])).with_save_mode(SaveMode::Overwrite).await?;
```
Here are the contents of the Delta table after the overwrite operation:

```
Expand All @@ -63,9 +96,20 @@ Here are the contents of the Delta table after the overwrite operation:

Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.

```python
dt = DeltaTable("tmp/some-table", version=1)
=== "Python"

```python
dt = DeltaTable("tmp/some-table", version=1)
```

=== "Rust"
```rust
let mut table = open_table("tmp/some-table").await?;
table.load_version(1).await?;
```


```
+-------+----------+
| num | letter |
|-------+----------|
Expand Down
47 changes: 47 additions & 0 deletions docs/usage/create-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,53 @@ You can easily write a DataFrame to a Delta table.
df.write_delta("tmp/some-table")
```

=== "Rust"

```rust
let delta_ops = DeltaOps::try_from_uri("tmp/some-table").await?;
let mut table = delta_ops
.create()
.with_table_name("some-table")
.with_save_mode(SaveMode::Overwrite)
.with_columns(
StructType::new(vec![
StructField::new(
"num".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"letter".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
])
.fields()
.cloned(),
)
.await?;

let mut record_batch_writer =
deltalake::writer::RecordBatchWriter::for_table(&mut table)?;
record_batch_writer
.write(
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("num", DataType::Int32, true),
Field::new("letter", Utf8, true),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a", "b", "c",
])),
],
)?,
)
.await?;
record_batch_writer.flush_and_commit(&mut table).await?;
```

Here are the contents of the Delta table in storage:

```
Expand Down
28 changes: 23 additions & 5 deletions docs/usage/deleting-rows-from-delta-lake-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,29 @@ Suppose you have the following Delta table with four rows:

Here's how to delete all the rows where the `num` is greater than 2:

```python
dt = DeltaTable("tmp/my-table")
dt.delete("num > 2")
```

=== "Python"

```python
dt = DeltaTable("tmp/my-table")
dt.delete("num > 2")
```

=== "Rust"
```rust
let table = deltalake::open_table("./data/simple_table").await?;
let (table, delete_metrics) = DeltaOps(table)
.delete()
.with_predicate(col("num").gt(lit(2)))
.await?;
```
`with_predicate` expects an argument that can be translated to a Datafusion `Expression`. This can be either using the Dataframe API, or using a `SQL where` clause:
```rust
let table = deltalake::open_table("./data/simple_table").await?;
let (table, delete_metrics) = DeltaOps(table)
.delete()
.with_predicate("num > 2")
.await?;
```
Here are the contents of the Delta table after the delete operation has been performed:

```
Expand Down
Loading

0 comments on commit d851959

Please sign in to comment.