Skip to content

Commit

Permalink
feat(catalog): add initial rest catalog impl (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade authored Feb 14, 2024
1 parent abd4fb0 commit d209a3f
Show file tree
Hide file tree
Showing 15 changed files with 2,788 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [ '1.20', '1.21' ]
go: [ '1.21' ]
os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ]
steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ lib/
*.iml

.envrc*

# local catalog environment via docker
dev/notebooks
dev/warehouse
123 changes: 123 additions & 0 deletions catalog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Catalog Implementations

## Integration Testing

The Catalog implementations can be manually tested using the CLI implemented
in the `cmd/iceberg` folder.

### REST Catalog

To test the REST catalog implementation, we have a docker configuration
for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the `dev/` folder and running
`docker-compose up`. You can then follow the steps of the Iceberg [Quickstart](https://iceberg.apache.org/spark-quickstart/#creating-a-table)
tutorial, which we've summarized below.

#### Setup your Iceberg catalog

First launch a pyspark console by running:

```bash
docker exec -it spark-iceberg pyspark
```

Once in the pyspark shell, we create a simple table with a namespace of
"demo.nyc" called "taxis":

```python
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()
```

Finally, we write another data-frame to the table to add new files:

```python
schema = spark.table("demo.nyc.taxis").schema
data = [
(1, 1000371, 1.8, 15.32, "N"),
(2, 1000372, 2.5, 22.15, "N"),
(2, 1000373, 0.9, 9.01, "N"),
(1, 1000374, 8.4, 42.13, "Y")
]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
```

#### Testing with the CLI

Now that we have a table in the catalog which is running. You can use the
CLI which is implemented in the `cmd/iceberg` folder. You will need to set
the following environment variables (which can also be found in the
docker-compose.yml):

```
AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password
```

With those environment variables set you can now run the CLI:

```bash
$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs |
| ---- |
| demo |
└──────┘
```

You can retrieve the schema of the table:

```bash
$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string
```

You can get the file list:

```bash
$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
└─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
└──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
```

and so on, for the various options available in the CLI.
134 changes: 128 additions & 6 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package catalog

import (
"context"
"crypto/tls"
"errors"
"net/url"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
)
Expand All @@ -37,29 +40,148 @@ const (

var (
// ErrNoSuchTable is returned when a table does not exist in the catalog.
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
)

// WithAwsConfig sets the AWS configuration for the catalog.
func WithAwsConfig(cfg aws.Config) Option {
func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
return func(o *options) {
o.awsConfig = cfg
}
}

type Option func(*options)
func WithCredential(cred string) Option[RestCatalog] {
return func(o *options) {
o.credential = cred
}
}

func WithOAuthToken(token string) Option[RestCatalog] {
return func(o *options) {
o.oauthToken = token
}
}

func WithTLSConfig(config *tls.Config) Option[RestCatalog] {
return func(o *options) {
o.tlsConfig = config
}
}

func WithWarehouseLocation(loc string) Option[RestCatalog] {
return func(o *options) {
o.warehouseLocation = loc
}
}

func WithMetadataLocation(loc string) Option[RestCatalog] {
return func(o *options) {
o.metadataLocation = loc
}
}

func WithSigV4() Option[RestCatalog] {
return func(o *options) {
o.enableSigv4 = true
o.sigv4Service = "execute-api"
}
}

func WithSigV4RegionSvc(region, service string) Option[RestCatalog] {
return func(o *options) {
o.enableSigv4 = true
o.sigv4Region = region

if service == "" {
o.sigv4Service = "execute-api"
} else {
o.sigv4Service = service
}
}
}

func WithAuthURI(uri *url.URL) Option[RestCatalog] {
return func(o *options) {
o.authUri = uri
}
}

func WithPrefix(prefix string) Option[RestCatalog] {
return func(o *options) {
o.prefix = prefix
}
}

type Option[T GlueCatalog | RestCatalog] func(*options)

type options struct {
awsConfig aws.Config

tlsConfig *tls.Config
credential string
oauthToken string
warehouseLocation string
metadataLocation string
enableSigv4 bool
sigv4Region string
sigv4Service string
prefix string
authUri *url.URL
}

type PropertiesUpdateSummary struct {
Removed []string `json:"removed"`
Updated []string `json:"updated"`
Missing []string `json:"missing"`
}

// Catalog for iceberg table operations like create, drop, load, list and others.
type Catalog interface {
// CatalogType returns the type of the catalog.
CatalogType() CatalogType

// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error)
// CatalogType returns the type of the catalog.
CatalogType() CatalogType
LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
// DropTable tells the catalog to drop the table entirely
DropTable(ctx context.Context, identifier table.Identifier) error
// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
// ListNamespaces returns the list of available namespaces, optionally filtering by a
// parent namespace
ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
// CreateNamespace tells the catalog to create a new namespace with the given properties
CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
DropNamespace(ctx context.Context, namespace table.Identifier) error
// LoadNamespaceProperties returns the current properties in the catalog for
// a given namespace
LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}

const (
keyOauthToken = "token"
keyWarehouseLocation = "warehouse"
keyMetadataLocation = "metadata_location"
keyOauthCredential = "credential"
)

func TableNameFromIdent(ident table.Identifier) string {
if len(ident) == 0 {
return ""
}

return ident[len(ident)-1]
}

func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}
34 changes: 32 additions & 2 deletions catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -44,7 +45,7 @@ type GlueCatalog struct {
glueSvc glueAPI
}

func NewGlueCatalog(opts ...Option) *GlueCatalog {
func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
glueOps := &options{}

for _, o := range opts {
Expand Down Expand Up @@ -91,7 +92,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier
// LoadTable loads a table from the catalog table details.
//
// The identifier should contain the Glue database name, then glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) {
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,6 +125,35 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] rename table", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error {
return fmt.Errorf("%w: [Glue Catalog] create namespace", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error {
return fmt.Errorf("%w: [Glue Catalog] drop namespace", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] load namespace properties", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) {
return PropertiesUpdateSummary{}, fmt.Errorf("%w: [Glue Catalog] update namespace properties", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] list namespaces", iceberg.ErrNotImplemented)
}

// GetTable loads a table from the Glue Catalog using the given database and table name.
func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) {
tblRes, err := c.glueSvc.GetTable(ctx,
Expand Down
Loading

0 comments on commit d209a3f

Please sign in to comment.