Skip to content

Commit

Permalink
feat: glue table creation with some docs on testing
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfeidau committed Feb 20, 2024
1 parent 5db83a0 commit 6cb048b
Show file tree
Hide file tree
Showing 14 changed files with 554 additions and 18 deletions.
44 changes: 43 additions & 1 deletion catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/url"
"strings"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/google/uuid"
)

type CatalogType string
Expand All @@ -52,6 +55,14 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
}
}

// WithDefaultLocation sets the default location for the catalog, this is used
// when a location is not provided in the create table operation.
func WithDefaultLocation(location string) Option[GlueCatalog] {
return func(o *options) {
o.defaultLocation = location
}
}

func WithCredential(cred string) Option[RestCatalog] {
return func(o *options) {
o.credential = cred
Expand Down Expand Up @@ -117,7 +128,8 @@ func WithPrefix(prefix string) Option[RestCatalog] {
type Option[T GlueCatalog | RestCatalog] func(*options)

type options struct {
awsConfig aws.Config
awsConfig aws.Config
defaultLocation string

tlsConfig *tls.Config
credential string
Expand Down Expand Up @@ -185,3 +197,33 @@ func TableNameFromIdent(ident table.Identifier) string {
func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}

func getMetadataPath(locationPath string, newVersion int) (string, error) {
if newVersion < 0 {
return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion)
}

metaDataPath, err := url.JoinPath(strings.TrimLeft(locationPath, "/"), "metadata", fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String()))
if err != nil {
return "", fmt.Errorf("failed to build metadata path: %w", err)
}

return metaDataPath, nil
}

func getLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) {
if location != "" {
return url.Parse(location)
}

if defaultLocation == "" {
return nil, fmt.Errorf("no default path is set, please specify a location when creating a table")
}

u, err := url.Parse(defaultLocation)
if err != nil {
return nil, fmt.Errorf("failed to parse location URL: %w", err)
}

return u.JoinPath(fmt.Sprintf("%s.db", database), tableName), nil
}
123 changes: 123 additions & 0 deletions catalog/catalog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package catalog

import (
"regexp"
"testing"

"github.com/stretchr/testify/require"
)

func TestGetLocationForTable(t *testing.T) {
type args struct {
location string
defaultLocation string
database string
tableName string
}

tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "should return location if location is provided",
args: args{
location: "s3://new-bucket/test-table",
defaultLocation: "s3://test-bucket",
database: "test-database",
tableName: "test-table",
},
want: "s3://new-bucket/test-table",
wantErr: false,
},
{
name: "should return default location with generated path if location is not provided",
args: args{
location: "",
defaultLocation: "s3://test-bucket/test-prefix/",
database: "test-database",
tableName: "test-table",
},
want: "s3://test-bucket/test-prefix/test-database.db/test-table",
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getLocationForTable(tt.args.location, tt.args.defaultLocation, tt.args.database, tt.args.tableName)
if (err != nil) != tt.wantErr {
t.Errorf("tableS3Location() error = %v, wantErr %v", err, tt.wantErr)
return
}

if got.String() != tt.want {
t.Errorf("tableS3Location() = %v, want %v", got, tt.want)
}
})
}
}

func TestGetMetadataPath(t *testing.T) {
type args struct {
locationPath string
version int
}

tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "should return metadata location with version 0",
args: args{
locationPath: "/test-table/",
version: 1,
},
want: "^test-table/metadata/00001-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$",
wantErr: false,
},
{
name: "should return metadata location with version 1",
args: args{
locationPath: "/test-table/",
version: 0,
},
want: "^test-table/metadata/00000-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$",
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getMetadataPath(tt.args.locationPath, tt.args.version)
if (err != nil) != tt.wantErr {
t.Errorf("getMetadataPath() error = %v, wantErr %v", err, tt.wantErr)
return
}

require.Regexp(t, regexp.MustCompile(tt.want), got)
})
}
}
76 changes: 74 additions & 2 deletions catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package catalog

import (
"context"
"encoding/json"
"errors"
"fmt"

Expand All @@ -39,10 +40,12 @@ var (
type glueAPI interface {
GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error)
GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error)
CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error)
}

type GlueCatalog struct {
glueSvc glueAPI
glueSvc glueAPI
defaultLocation string
}

func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
Expand All @@ -53,7 +56,8 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
}

return &GlueCatalog{
glueSvc: glue.NewFromConfig(glueOps.awsConfig),
glueSvc: glue.NewFromConfig(glueOps.awsConfig),
defaultLocation: glueOps.defaultLocation,
}
}

Expand Down Expand Up @@ -121,6 +125,55 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier
return icebergTable, nil
}

func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder table.SortOrder, location string, props map[string]string) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
}

locationURL, err := getLocationForTable(location, c.defaultLocation, database, tableName)
if err != nil {
return nil, err
}

// append metadata path to location path/to/database.db/tablename/metadata/00000-UUID.metadata.json
metadataPath, err := getMetadataPath(locationURL.Path, 0)
if err != nil {
return nil, err
}

tbl, err := table.NewTable(identifier, schema, partitionSpec, sortOrder, locationURL.String(), props)
if err != nil {
return nil, err
}

err = c.writeMetaData(locationURL.String(), metadataPath, tbl.Metadata())
if err != nil {
return nil, err
}

params := &glue.CreateTableInput{
DatabaseName: aws.String(database),

TableInput: &types.TableInput{
Name: aws.String(tableName),
TableType: aws.String("EXTERNAL_TABLE"),
Parameters: map[string]string{
"table_type": glueTableTypeIceberg,
"metadata_location": fmt.Sprintf("%s://%s/%s", locationURL.Scheme, locationURL.Host, metadataPath),
},
StorageDescriptor: &types.StorageDescriptor{Location: aws.String(locationURL.String())},
},
}

_, err = c.glueSvc.CreateTable(ctx, params)
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
}

return tbl, nil
}

func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}
Expand Down Expand Up @@ -176,6 +229,25 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string)
return tblRes.Table.Parameters["metadata_location"], nil
}

func (c *GlueCatalog) writeMetaData(location, metadataPath string, metadata table.Metadata) error {
iofs, err := io.LoadFS(map[string]string{}, location)
if err != nil {
return fmt.Errorf("failed to load fs: %w", err)
}

data, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal table metadata: %w", err)
}

err = iofs.WriteFile(metadataPath, data, 0644)
if err != nil {
return fmt.Errorf("failed to write metadata file: %w", err)
}

return nil
}

func identifierToGlueTable(identifier table.Identifier) (string, string, error) {
if len(identifier) != 2 {
return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier)
Expand Down
48 changes: 48 additions & 0 deletions catalog/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"os"
"testing"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/glue"
Expand All @@ -44,6 +46,11 @@ func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesIn
return args.Get(0).(*glue.GetTablesOutput), args.Error(1)
}

func (m *mockGlueClient) CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) {
args := m.Called(ctx, params, optFns)
return args.Get(0).(*glue.CreateTableOutput), args.Error(1)
}

func TestGlueGetTable(t *testing.T) {
assert := require.New(t)

Expand Down Expand Up @@ -146,3 +153,44 @@ func TestGlueLoadTableIntegration(t *testing.T) {
assert.NoError(err)
assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier())
}

func TestGlueCreateTableIntegration(t *testing.T) {
if os.Getenv("TEST_DATABASE_NAME") == "" {
t.Skip()
}
if os.Getenv("TEST_CREATE_TABLE_NAME") == "" {
t.Skip()
}
if os.Getenv("TEST_CREATE_TABLE_LOCATION") == "" {
t.Skip()
}

assert := require.New(t)

location := os.Getenv("TEST_CREATE_TABLE_LOCATION")

schema := iceberg.NewSchemaWithIdentifiers(1, []int{},
iceberg.NestedField{
ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.String},
iceberg.NestedField{
ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String},
iceberg.NestedField{
ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz})
partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}})

props := map[string]string{
"write.target-file-size-bytes": "536870912",
"write.format.default": "parquet",
}

awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
assert.NoError(err)

catalog := NewGlueCatalog(WithAwsConfig(awscfg))

table, err := catalog.CreateTable(context.TODO(),
[]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, schema, partSpec, table.UnsortedSortOrder, location, props)
assert.NoError(err)
assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, table.Identifier())
}
Loading

0 comments on commit 6cb048b

Please sign in to comment.