Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(catalog): add initial rest catalog impl #58

Merged
merged 20 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [ '1.20', '1.21' ]
go: [ '1.21' ]
os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ]
steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ lib/
*.iml

.envrc*

# local catalog environment via docker
dev/notebooks
dev/warehouse
123 changes: 123 additions & 0 deletions catalog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Catalog Implementations

## Integration Testing

The Catalog implementations can be manually tested using the CLI implemented
in the `cmd/iceberg` folder.

### REST Catalog

To test the REST catalog implementation, we have a docker configuration
for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the `dev/` folder and running
`docker-compose up`. You can then follow the steps of the Iceberg [Quickstart](https://iceberg.apache.org/spark-quickstart/#creating-a-table)
tutorial, which we've summarized below.

#### Setup your Iceberg catalog

First launch a pyspark console by running:

```bash
docker exec -it spark-iceberg pyspark
```

Once in the pyspark shell, we create a simple table with a namespace of
"demo.nyc" called "taxis":

```python
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()
```

Finally, we write another data-frame to the table to add new files:

```python
schema = spark.table("demo.nyc.taxis").schema
data = [
(1, 1000371, 1.8, 15.32, "N"),
(2, 1000372, 2.5, 22.15, "N"),
(2, 1000373, 0.9, 9.01, "N"),
(1, 1000374, 8.4, 42.13, "Y")
]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
```

#### Testing with the CLI

Now that we have a table in the catalog which is running. You can use the
CLI which is implemented in the `cmd/iceberg` folder. You will need to set
the following environment variables (which can also be found in the
docker-compose.yml):

```
AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password
```

With those environment variables set you can now run the CLI:

```bash
$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs |
| ---- |
| demo |
└──────┘
```

You can retrieve the schema of the table:

```bash
$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string
```

You can get the file list:

```bash
$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
└─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
└──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
```

and so on, for the various options available in the CLI.
134 changes: 128 additions & 6 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package catalog

import (
"context"
"crypto/tls"
"errors"
"net/url"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
)
Expand All @@ -37,29 +40,148 @@ const (

var (
// ErrNoSuchTable is returned when a table does not exist in the catalog.
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
)

// WithAwsConfig sets the AWS configuration for the catalog.
func WithAwsConfig(cfg aws.Config) Option {
func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
return func(o *options) {
o.awsConfig = cfg
}
}

type Option func(*options)
func WithCredential(cred string) Option[RestCatalog] {
return func(o *options) {
o.credential = cred
}
}

func WithOAuthToken(token string) Option[RestCatalog] {
return func(o *options) {
o.oauthToken = token
}
}

func WithTLSConfig(config *tls.Config) Option[RestCatalog] {
return func(o *options) {
o.tlsConfig = config
}
}

func WithWarehouseLocation(loc string) Option[RestCatalog] {
return func(o *options) {
o.warehouseLocation = loc
}
}

func WithMetadataLocation(loc string) Option[RestCatalog] {
return func(o *options) {
o.metadataLocation = loc
}
}

func WithSigV4() Option[RestCatalog] {
return func(o *options) {
o.enableSigv4 = true
o.sigv4Service = "execute-api"
}
}

func WithSigV4RegionSvc(region, service string) Option[RestCatalog] {
return func(o *options) {
o.enableSigv4 = true
o.sigv4Region = region

if service == "" {
o.sigv4Service = "execute-api"
} else {
o.sigv4Service = service
}
}
}

func WithAuthURI(uri *url.URL) Option[RestCatalog] {
return func(o *options) {
o.authUri = uri
}
}

func WithPrefix(prefix string) Option[RestCatalog] {
return func(o *options) {
o.prefix = prefix
}
}

type Option[T GlueCatalog | RestCatalog] func(*options)

type options struct {
awsConfig aws.Config

tlsConfig *tls.Config
credential string
oauthToken string
warehouseLocation string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can these be configured from the cmd line? I would assume we'd have a config file similar to what pyiceberg has?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently only credential is configurable on the CLI.

I can add the warehouse and token easily as options. Unless we think that the config file would be better in general

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a config file makes more sense, because then configs can be specifig to catalogs. Otherwise you'd have to maintain all the available config options as CLI options

metadataLocation string
enableSigv4 bool
sigv4Region string
sigv4Service string
prefix string
authUri *url.URL
}

type PropertiesUpdateSummary struct {
Removed []string `json:"removed"`
Updated []string `json:"updated"`
Missing []string `json:"missing"`
}

// Catalog for iceberg table operations like create, drop, load, list and others.
type Catalog interface {
// CatalogType returns the type of the catalog.
CatalogType() CatalogType

// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error)
// CatalogType returns the type of the catalog.
CatalogType() CatalogType
LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
// DropTable tells the catalog to drop the table entirely
DropTable(ctx context.Context, identifier table.Identifier) error
// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
// ListNamespaces returns the list of available namespaces, optionally filtering by a
// parent namespace
ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
// CreateNamespace tells the catalog to create a new namespace with the given properties
CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
DropNamespace(ctx context.Context, namespace table.Identifier) error
// LoadNamespaceProperties returns the current properties in the catalog for
// a given namespace
LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}

const (
keyOauthToken = "token"
keyWarehouseLocation = "warehouse"
keyMetadataLocation = "metadata_location"
keyOauthCredential = "credential"
)

func TableNameFromIdent(ident table.Identifier) string {
if len(ident) == 0 {
return ""
}

return ident[len(ident)-1]
}

func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}
34 changes: 32 additions & 2 deletions catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -44,7 +45,7 @@ type GlueCatalog struct {
glueSvc glueAPI
}

func NewGlueCatalog(opts ...Option) *GlueCatalog {
func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
glueOps := &options{}

for _, o := range opts {
Expand Down Expand Up @@ -91,7 +92,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier
// LoadTable loads a table from the catalog table details.
//
// The identifier should contain the Glue database name, then glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) {
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a better name than identifierToGlueTable eh.

if err != nil {
return nil, err
Expand Down Expand Up @@ -124,6 +125,35 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] rename table", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error {
return fmt.Errorf("%w: [Glue Catalog] create namespace", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error {
return fmt.Errorf("%w: [Glue Catalog] drop namespace", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] load namespace properties", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) {
return PropertiesUpdateSummary{}, fmt.Errorf("%w: [Glue Catalog] update namespace properties", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] list namespaces", iceberg.ErrNotImplemented)
}

// GetTable loads a table from the Glue Catalog using the given database and table name.
func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) {
tblRes, err := c.glueSvc.GetTable(ctx,
Expand Down
Loading
Loading