Skip to content

Commit

Permalink
[doc] Update Python API for pypaimon 0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 2, 2024
1 parent e72c06c commit 503fa69
Showing 1 changed file with 179 additions and 77 deletions.
256 changes: 179 additions & 77 deletions docs/content/program-api/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ Java-based implementation will launch a JVM and use `py4j` to execute Java code

### SDK Installing

SDK is published at [paimon-python](https://pypi.org/project/paimon-python/). You can install by
SDK is published at [pypaimon](https://pypi.org/project/pypaimon/). You can install by
```shell
pip install paimon-python
pip install pypaimon
```

### Java Runtime Environment
Expand Down Expand Up @@ -67,7 +67,7 @@ classpath via one of the following ways:

```python
import os
from paimon_python_java import constants
from pypaimon.py4j import constants

os.environ[constants.PYPAIMON_JAVA_CLASSPATH] = '/path/to/jars/*'
```
Expand All @@ -81,7 +81,7 @@ You can set JVM args via one of the following ways:

```python
import os
from paimon_python_java import constants
from pypaimon.py4j import constants

os.environ[constants.PYPAIMON_JVM_ARGS] = 'arg1 arg2 ...'
```
Expand All @@ -98,7 +98,7 @@ Otherwise, you should set hadoop classpath via one of the following ways:

```python
import os
from paimon_python_java import constants
from pypaimon.py4j import constants

os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = '/path/to/jars/*'
```
Expand All @@ -111,7 +111,7 @@ If you just want to test codes in local, we recommend to use [Flink Pre-bundled
Before coming into contact with the Table, you need to create a Catalog.

```python
from paimon_python_java import Catalog
from pypaimon.py4j import Catalog

# Note that keys and values are all string
catalog_options = {
Expand All @@ -138,12 +138,17 @@ catalog.create_database(

### Create Schema

Table schema contains fields definition, partition keys, primary keys, table options and comment. For example:
Table schema contains fields definition, partition keys, primary keys, table options and comment.
The field definition is described by `pyarrow.Schema`. All arguments except fields definition are optional.

Generally, there are two ways to build `pyarrow.Schema`.

First, you can use `pyarrow.schema` method directly, for example:

```python
import pyarrow as pa

from paimon_python_api import Schema
from pypaimon import Schema

pa_schema = pa.schema([
('dt', pa.string()),
Expand All @@ -161,13 +166,13 @@ schema = Schema(
)
```

All arguments except `pa_schema` is optional. If you have some Pandas data, the `pa_schema` can be extracted from `DataFrame`:
Second, if you have some Pandas data, the `pa_schema` can be extracted from `DataFrame`:

```python
import pandas as pd
import pyarrow as pa

from paimon_python_api import Schema
from pypaimon import Schema

# Example DataFrame data
data = {
Expand All @@ -184,10 +189,15 @@ schema = Schema(
pa_schema=record_batch.schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'})
options={'bucket': '2'},
comment='my test table'
)
```

### Create Table

After building table schema, you can create corresponding table:

```python
schema = ...
catalog.create_table(
Expand All @@ -207,30 +217,165 @@ table = catalog.get_table('database_name.table_name')

## Batch Read

TableRead interface provides parallelly reading for multiple splits. You can set `'max-workers': 'N'` in `catalog_options`
to set thread numbers when reading splits. `max-workers` is 1 by default, that means TableRead will read splits sequentially
if you doesn't set `max-workers`.
The reading is divided into Scan Plan and Read Splits stages. A `ReadBuilder` is used to create utils for these stages.

```python
table = catalog.get_table('database_name.table_name')

# 1. Create table scan and read
read_builder = table.new_read_builder()
```

You can use `PredicateBuilder` to build filters and pushdown them by `ReadBuilder`:

```python
# Example filter: ('f0' < 3 OR 'f1' > 6) AND 'f3' = 'A'

predicate_builder = read_builder.new_predicate_builder()

predicate1 = predicate_builder.less_than('f0', 3)
predicate2 = predicate_builder.greater_than('f1', 6)
predicate3 = predicate_builder.or_predicates([predicate1, predicate2])

predicate4 = predicate_builder.equal('f3', 'A')
predicate_5 = predicate_builder.and_predicates([predicate3, predicate4])

read_builder = read_builder.with_filter(predicate_5)
```


| Predicate kind | Predicate method |
|:----------------------|:----------------------------------------------|
| p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
| p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
| f = literal | PredicateBuilder.equal(f, literal) |
| f != literal | PredicateBuilder.not_equal(f, literal) |
| f < literal | PredicateBuilder.less_than(f, literal) |
| f <= literal | PredicateBuilder.less_or_equal(f, literal) |
| f > literal | PredicateBuilder.greater_than(f, literal) |
| f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
| f is null | PredicateBuilder.is_null(f) |
| f is not null | PredicateBuilder.is_not_null(f) |
| f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
| f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |

You can also pushdown projection by `ReadBuilder`:

```python
# select f3 and f2 columns
read_builder = read_builder.with_projection(['f3', 'f2'])
```

Then you can scan plan:

```python
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.splits()
```

Finally, you can read data from the `splits` to various data format.

# 2. Get splits
splits = table_scan.plan().splits()
### Apache Arrow

# 3. Read splits. Support 3 methods:
# 3.1 Read as pandas.DataFrame
dataframe = table_read.to_pandas(splits)
This requires `pyarrow` to be installed.

# 3.2 Read as pyarrow.Table
You can read all the data into a `pyarrow.Table`:

```python
table_read = read_builder.new_read()
pa_table = table_read.to_arrow(splits)
print(pa_table)

# pyarrow.Table
# f0: int32
# f1: string
# ----
# f0: [[1,2,3],[4,5,6],...]
# f1: [["a","b","c"],["d","e","f"],...]
```

You can also read data into a `pyarrow.RecordBatchReader` and iterate record batches:

# 3.3 Read as pyarrow.RecordBatchReader
record_batch_reader = table_read.to_arrow_batch_reader(splits)
```python
table_read = read_builder.new_read()
for batch in table_read.to_arrow_batch_reader(splits)
print(batch)

# pyarrow.RecordBatch
# f0: int32
# f1: string
# ----
# f0: [1,2,3]
# f1: ["a","b","c"]
```

### Pandas

This requires `pandas` to be installed.

You can read all the data into a `pandas.DataFrame`:

```python
table_read = read_builder.new_read()
df = table_read.to_pandas(splits)
print(df)

# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
```

### DuckDB

This requires `duckdb` to be installed.

You can convert the splits into a in-memory DuckDB table and query it:

```python
table_read = read_builder.new_read()
duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')

print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...

print(duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 = 1").fetchdf())
# f0 f1
# 0 1 a
```

### Ray

This requires `ray` to be installed.

You can convert the splits into a Ray dataset and handle it by Ray API:

```python
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)

print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})

print(ray_dataset.take(3))
# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]

print(ray_dataset.to_pandas())
# f0 f1
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
# ...
```

## Batch Write
Expand All @@ -246,12 +391,6 @@ table = catalog.get_table('database_name.table_name')

# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
# By default, write data will be appended to table.
# If you want to overwrite table:
# write_builder.overwrite()
# If you want to overwrite partition 'dt=2024-01-01':
# write_builder.overwrite({'dt': '2024-01-01'})

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

Expand All @@ -275,7 +414,16 @@ table_commit.commit(commit_messages)
# 4. Close resources
table_write.close()
table_commit.close()
```

By default, the data will be appended to table. If you want to overwrite table, you should use `TableWrite#overwrite` API:

```python
# overwrite whole table
write_builder.overwrite()

# overwrite partition 'dt=2024-01-01'
write_builder.overwrite({'dt': '2024-01-01'})
```

## Data Types
Expand All @@ -290,49 +438,3 @@ table_commit.close()
| pyarrow.float64() | DOUBLE |
| pyarrow.string() | STRING |
| pyarrow.boolean() | BOOLEAN |

## Predicate

You can use predicate to filter data when reading. Example:

```python
# table data:
# f0: 0 1 2 3 4
# f1: 5 6 7 8 9
read_builder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()

# build predicate: f0 < 3 && f1 > 5
predicate1 = predicate_builder.less_than('f0', 1);
predicate2 = predicate_builder.greater_than('f1', 5);
predicate = predicate_builder.and_predicates([predicate1, predicate2])

read_builder = read_builder.with_filter(predicate)
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
dataframe = table_read.to_pandas(splits)

# result:
# f0: 1 2
# f1: 6 7
```

| Predicate kind | Predicate method |
|:----------------------|:----------------------------------------------|
| p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
| p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
| f = literal | PredicateBuilder.equal(f, literal) |
| f != literal | PredicateBuilder.not_equal(f, literal) |
| f < literal | PredicateBuilder.less_than(f, literal) |
| f <= literal | PredicateBuilder.less_or_equal(f, literal) |
| f > literal | PredicateBuilder.greater_than(f, literal) |
| f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
| f is null | PredicateBuilder.is_null(f) |
| f is not null | PredicateBuilder.is_not_null(f) |
| f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
| f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |

0 comments on commit 503fa69

Please sign in to comment.