diff --git a/catalog/catalog.go b/catalog/catalog.go index d6d7f1e9..512834ea 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -21,11 +21,14 @@ import ( "context" "crypto/tls" "errors" + "fmt" "net/url" + "strings" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/google/uuid" ) type CatalogType string @@ -52,6 +55,14 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] { } } +// WithDefaultLocation sets the default location for the catalog, this is used +// when a location is not provided in the create table operation. +func WithDefaultLocation(location string) Option[GlueCatalog] { + return func(o *options) { + o.defaultLocation = location + } +} + func WithCredential(cred string) Option[RestCatalog] { return func(o *options) { o.credential = cred @@ -117,7 +128,8 @@ func WithPrefix(prefix string) Option[RestCatalog] { type Option[T GlueCatalog | RestCatalog] func(*options) type options struct { - awsConfig aws.Config + awsConfig aws.Config + defaultLocation string tlsConfig *tls.Config credential string @@ -185,3 +197,33 @@ func TableNameFromIdent(ident table.Identifier) string { func NamespaceFromIdent(ident table.Identifier) table.Identifier { return ident[:len(ident)-1] } + +func getMetadataPath(locationPath string, newVersion int) (string, error) { + if newVersion < 0 { + return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion) + } + + metaDataPath, err := url.JoinPath(strings.TrimLeft(locationPath, "/"), "metadata", fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String())) + if err != nil { + return "", fmt.Errorf("failed to build metadata path: %w", err) + } + + return metaDataPath, nil +} + +func getLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) { + if location != "" { + return url.Parse(location) + } + + if defaultLocation == "" { + return nil, fmt.Errorf("no default path is set, please specify a location when creating a table") + } + + u, err := url.Parse(defaultLocation) + if err != nil { + return nil, fmt.Errorf("failed to parse location URL: %w", err) + } + + return u.JoinPath(fmt.Sprintf("%s.db", database), tableName), nil +} diff --git a/catalog/catalog_test.go b/catalog/catalog_test.go new file mode 100644 index 00000000..9e19a3f0 --- /dev/null +++ b/catalog/catalog_test.go @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package catalog + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetLocationForTable(t *testing.T) { + type args struct { + location string + defaultLocation string + database string + tableName string + } + + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "should return location if location is provided", + args: args{ + location: "s3://new-bucket/test-table", + defaultLocation: "s3://test-bucket", + database: "test-database", + tableName: "test-table", + }, + want: "s3://new-bucket/test-table", + wantErr: false, + }, + { + name: "should return default location with generated path if location is not provided", + args: args{ + location: "", + defaultLocation: "s3://test-bucket/test-prefix/", + database: "test-database", + tableName: "test-table", + }, + want: "s3://test-bucket/test-prefix/test-database.db/test-table", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getLocationForTable(tt.args.location, tt.args.defaultLocation, tt.args.database, tt.args.tableName) + if (err != nil) != tt.wantErr { + t.Errorf("tableS3Location() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if got.String() != tt.want { + t.Errorf("tableS3Location() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetMetadataPath(t *testing.T) { + type args struct { + locationPath string + version int + } + + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "should return metadata location with version 0", + args: args{ + locationPath: "/test-table/", + version: 1, + }, + want: "^test-table/metadata/00001-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$", + wantErr: false, + }, + { + name: "should return metadata location with version 1", + args: args{ + locationPath: "/test-table/", + version: 0, + }, + want: "^test-table/metadata/00000-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getMetadataPath(tt.args.locationPath, tt.args.version) + if (err != nil) != tt.wantErr { + t.Errorf("getMetadataPath() error = %v, wantErr %v", err, tt.wantErr) + return + } + + require.Regexp(t, regexp.MustCompile(tt.want), got) + }) + } +} diff --git a/catalog/glue.go b/catalog/glue.go index b3981852..b0a0ece0 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -19,6 +19,7 @@ package catalog import ( "context" + "encoding/json" "errors" "fmt" @@ -39,10 +40,12 @@ var ( type glueAPI interface { GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) + CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) } type GlueCatalog struct { - glueSvc glueAPI + glueSvc glueAPI + defaultLocation string } func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { @@ -53,7 +56,8 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { } return &GlueCatalog{ - glueSvc: glue.NewFromConfig(glueOps.awsConfig), + glueSvc: glue.NewFromConfig(glueOps.awsConfig), + defaultLocation: glueOps.defaultLocation, } } @@ -121,6 +125,55 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier return icebergTable, nil } +func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder table.SortOrder, location string, props map[string]string) (*table.Table, error) { + database, tableName, err := identifierToGlueTable(identifier) + if err != nil { + return nil, err + } + + locationURL, err := getLocationForTable(location, c.defaultLocation, database, tableName) + if err != nil { + return nil, err + } + + // append metadata path to location path/to/database.db/tablename/metadata/00000-UUID.metadata.json + metadataPath, err := getMetadataPath(locationURL.Path, 0) + if err != nil { + return nil, err + } + + tbl, err := table.NewTable(identifier, schema, partitionSpec, sortOrder, locationURL.String(), props) + if err != nil { + return nil, err + } + + err = c.writeMetaData(locationURL.String(), metadataPath, tbl.Metadata()) + if err != nil { + return nil, err + } + + params := &glue.CreateTableInput{ + DatabaseName: aws.String(database), + + TableInput: &types.TableInput{ + Name: aws.String(tableName), + TableType: aws.String("EXTERNAL_TABLE"), + Parameters: map[string]string{ + "table_type": glueTableTypeIceberg, + "metadata_location": fmt.Sprintf("%s://%s/%s", locationURL.Scheme, locationURL.Host, metadataPath), + }, + StorageDescriptor: &types.StorageDescriptor{Location: aws.String(locationURL.String())}, + }, + } + + _, err = c.glueSvc.CreateTable(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) + } + + return tbl, nil +} + func (c *GlueCatalog) CatalogType() CatalogType { return Glue } @@ -176,6 +229,25 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) return tblRes.Table.Parameters["metadata_location"], nil } +func (c *GlueCatalog) writeMetaData(location, metadataPath string, metadata table.Metadata) error { + iofs, err := io.LoadFS(map[string]string{}, location) + if err != nil { + return fmt.Errorf("failed to load fs: %w", err) + } + + data, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal table metadata: %w", err) + } + + err = iofs.WriteFile(metadataPath, data, 0644) + if err != nil { + return fmt.Errorf("failed to write metadata file: %w", err) + } + + return nil +} + func identifierToGlueTable(identifier table.Identifier) (string, string, error) { if len(identifier) != 2 { return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 1d3c42fa..8e873748 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -22,6 +22,8 @@ import ( "os" "testing" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/glue" @@ -44,6 +46,11 @@ func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesIn return args.Get(0).(*glue.GetTablesOutput), args.Error(1) } +func (m *mockGlueClient) CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.CreateTableOutput), args.Error(1) +} + func TestGlueGetTable(t *testing.T) { assert := require.New(t) @@ -146,3 +153,44 @@ func TestGlueLoadTableIntegration(t *testing.T) { assert.NoError(err) assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) } + +func TestGlueCreateTableIntegration(t *testing.T) { + if os.Getenv("TEST_DATABASE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_CREATE_TABLE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_CREATE_TABLE_LOCATION") == "" { + t.Skip() + } + + assert := require.New(t) + + location := os.Getenv("TEST_CREATE_TABLE_LOCATION") + + schema := iceberg.NewSchemaWithIdentifiers(1, []int{}, + iceberg.NestedField{ + ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz}) + partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) + + props := map[string]string{ + "write.target-file-size-bytes": "536870912", + "write.format.default": "parquet", + } + + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + assert.NoError(err) + + catalog := NewGlueCatalog(WithAwsConfig(awscfg)) + + table, err := catalog.CreateTable(context.TODO(), + []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, schema, partSpec, table.UnsortedSortOrder, location, props) + assert.NoError(err) + assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, table.Identifier()) +} diff --git a/docs/cfn/AWS_TESTING.md b/docs/cfn/AWS_TESTING.md new file mode 100644 index 00000000..ad71f0a7 --- /dev/null +++ b/docs/cfn/AWS_TESTING.md @@ -0,0 +1,74 @@ + + +# AWS integration testing + +To validate the glue catalog you will need to create some test resources. + +# Prerequisites + +1. An AWS account. +2. [AWS CLI](https://aws.amazon.com/cli/) is installed. +2. Exported environment variables for `AWS_DEFAULT_REGION`, `AWS_REGION` and `AWS_PROFILE`, I use [direnv](https://direnv.net/) to maintain these variables in a `.envrc` file. +3. Your have logged into an AWS account via the AWS CLI. + +The way to deploy this template is using the included cloudformation template is as follows: + +``` +aws cloudformation deploy --stack-name test-iceberg-glue-catalog --template-file docs/cfn/glue-catalog.yaml +``` + +Once deployed you can retrieve the outputs of the stack. + +``` +aws cloudformation describe-stacks --stack-name test-iceberg-glue-catalog --query 'Stacks[0].Outputs' +``` + +This should output JSON as follows: + +``` +[ + { + "OutputKey": "IcebergBucket", + "OutputValue": "test-iceberg-glue-catalog-icebergbucket-abc123abc123" + }, + { + "OutputKey": "GlueDatabase", + "OutputValue": "iceberg_test" + } +] +``` + +Export the required environment variables. + +``` +# the glue database from the outputs of the stack +export TEST_DATABASE_NAME=iceberg_test + +# the s3 bucket name from the outputs of the stack +export TEST_CREATE_TABLE_LOCATION=s3://test-iceberg-glue-catalog-icebergbucket-abc123abc123/testing + +# the name of the table you will create in the glue catalog +export TEST_CREATE_TABLE_NAME=records +``` + +Run the creation integration test to validate the catalog creation, and provide a table which can be used to validate other integration tests. + +``` +go test -v -run TestGlueCreateTableIntegration ./catalog +``` + diff --git a/docs/cfn/glue-catalog.yaml b/docs/cfn/glue-catalog.yaml new file mode 100644 index 00000000..b0585429 --- /dev/null +++ b/docs/cfn/glue-catalog.yaml @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +AWSTemplateFormatVersion: "2010-09-09" +Description: "apache-iceberg: Glue database and S3 bucket for integration testing" + +Parameters: + Stage: + Type: String + Description: The stage where the stack is running in, e.g., dev, test, prod. + Default: test + +Outputs: + GlueDatabase: + Value: !Ref GlueDatabase + IcebergBucket: + Value: !Ref IcebergBucket + +Resources: + GlueDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: !Sub iceberg_${Stage} + Description: iceberg database + + IcebergBucket: + Type: AWS::S3::Bucket + Properties: + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + + IcebergBucketPolicy: + Type: AWS::S3::BucketPolicy + Properties: + Bucket: !Ref IcebergBucket + PolicyDocument: + Statement: + - Sid: AllowSSLRequestsOnly + Effect: Deny + Principal: "*" + Action: + - s3:* + Resource: + - Fn::Sub: arn:aws:s3:::${IcebergBucket}/* + - Fn::Sub: arn:aws:s3:::${IcebergBucket} + Condition: + Bool: + aws:SecureTransport: false diff --git a/go.mod b/go.mod index 88bb9b5a..20ab49ac 100644 --- a/go.mod +++ b/go.mod @@ -24,13 +24,14 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.26.3 github.com/aws/aws-sdk-go-v2/credentials v1.16.14 github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 - github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 + github.com/aws/smithy-go v1.19.0 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 github.com/google/uuid v1.3.1 github.com/hamba/avro/v2 v2.16.0 github.com/pterm/pterm v0.12.78 github.com/stretchr/testify v1.8.4 - github.com/wolfeidau/s3iofs v1.5.0 + github.com/wolfeidau/s3iofs v1.5.2 golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) @@ -51,7 +52,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.18.6 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.6 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect - github.com/aws/smithy-go v1.19.0 // indirect github.com/containerd/console v1.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -66,7 +66,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect - golang.org/x/net v0.18.0 // indirect + golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/term v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 62ea84af..4cd9309d 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 h1:DBYTXwIG github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10/go.mod h1:wohMUQiFdzo0NtxbBg0mSRGZ4vL3n0dKjLTINdcIino= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 h1:KOxnQeWy5sXyS37fdKEvAsGHOr9fa/qvwxfJurR/BzE= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10/go.mod h1:jMx5INQFYFYB3lQD9W0D8Ohgq6Wnl7NYOJ2TQndbulI= -github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 h1:PJTdBMsyvra6FtED7JZtDpQrIAflYDHFoZAu/sKYkwU= -github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= +github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 h1:5XNlsBsEvBZBMO6p82y+sqpWg8j5aBCe+5C2GBFgqBQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= github.com/aws/aws-sdk-go-v2/service/sso v1.18.6 h1:dGrs+Q/WzhsiUKh82SfTVN66QzyulXuMDTV/G8ZxOac= github.com/aws/aws-sdk-go-v2/service/sso v1.18.6/go.mod h1:+mJNDdF+qiUlNKNC3fxn74WWNN+sOiGOEImje+3ScPM= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.6 h1:Yf2MIo9x+0tyv76GljxzqA3WtC5mw7NmazD2chwjxE4= @@ -126,8 +126,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/wolfeidau/s3iofs v1.5.0 h1:l0GsqWYnHwHYkBxDnMVL0fNA2mhRnnXm4rtRG93HsOw= -github.com/wolfeidau/s3iofs v1.5.0/go.mod h1:fPAKzdWmZ1Z2L9vnqL6d1eb7pVsUgkUstxQUkG1HIHA= +github.com/wolfeidau/s3iofs v1.5.2 h1:2dmzSxdrSY29GsILVheJqBbURVQX3KglggSBtVWCYj4= +github.com/wolfeidau/s3iofs v1.5.2/go.mod h1:fPAKzdWmZ1Z2L9vnqL6d1eb7pVsUgkUstxQUkG1HIHA= github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= @@ -142,8 +142,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/internal/mock_fs.go b/internal/mock_fs.go index 95f6c3fa..069d4cee 100644 --- a/internal/mock_fs.go +++ b/internal/mock_fs.go @@ -39,6 +39,10 @@ func (m *MockFS) Remove(name string) error { return m.Called(name).Error(0) } +func (m *MockFS) WriteFile(name string, data []byte, perm fs.FileMode) error { + return m.Called(name, data, perm).Error(0) +} + type MockFSReadFile struct { MockFS } diff --git a/io/io.go b/io/io.go index abe5971f..77005703 100644 --- a/io/io.go +++ b/io/io.go @@ -48,6 +48,9 @@ type IO interface { // // If there is an error, it will be of type *PathError. Remove(name string) error + + // Create creates or truncates the named file. + WriteFile(name string, data []byte, perm fs.FileMode) error } // ReadFileIO is the interface implemented by a file system that @@ -166,12 +169,23 @@ func (f ioFS) Remove(name string) error { return r.Remove(name) } +func (f ioFS) WriteFile(name string, data []byte, perm fs.FileMode) error { + r, ok := f.fsys.(interface { + WriteFile(name string, data []byte, perm fs.FileMode) error + }) + if !ok { + return errMissingWriteFile + } + return r.WriteFile(name, data, perm) +} + var ( - errMissingReadDir = errors.New("fs.File directory missing ReadDir method") - errMissingSeek = errors.New("fs.File missing Seek method") - errMissingReadAt = errors.New("fs.File missing ReadAt") - errMissingRemove = errors.New("fs.FS missing Remove method") - errMissingReadFile = errors.New("fs.FS missing ReadFile method") + errMissingReadDir = errors.New("fs.File directory missing ReadDir method") + errMissingSeek = errors.New("fs.File missing Seek method") + errMissingReadAt = errors.New("fs.File missing ReadAt") + errMissingRemove = errors.New("fs.FS missing Remove method") + errMissingWriteFile = errors.New("fs.FS missing WriteFile method") + errMissingReadFile = errors.New("fs.FS missing ReadFile method") ) type ioFile struct { diff --git a/io/local.go b/io/local.go index befa8319..40d0f4a4 100644 --- a/io/local.go +++ b/io/local.go @@ -30,3 +30,7 @@ func (LocalFS) Open(name string) (File, error) { func (LocalFS) Remove(name string) error { return os.Remove(name) } + +func (LocalFS) WriteFile(name string, data []byte, perm os.FileMode) error { + return os.WriteFile(name, data, perm) +} diff --git a/table/metadata.go b/table/metadata.go index 957e163a..c92b4cc2 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/apache/iceberg-go" @@ -399,3 +400,32 @@ func (m *MetadataV2) UnmarshalJSON(b []byte) error { m.preValidate() return m.validate() } + +func NewMetadataV2(schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder SortOrder, location string, tableUUID uuid.UUID, properties iceberg.Properties) (*MetadataV2, error) { + metadata := &MetadataV2{ + commonMetadata: commonMetadata{ + FormatVersion: 2, + UUID: tableUUID, + Loc: location, + Specs: []iceberg.PartitionSpec{partitionSpec}, + DefaultSpecID: partitionSpec.ID(), + SortOrderList: []SortOrder{sortOrder}, + DefaultSortOrderID: sortOrder.OrderID, + SchemaList: []*iceberg.Schema{schema}, + CurrentSchemaID: schema.ID, + Props: properties, + LastColumnId: schema.HighestFieldID(), + LastPartitionID: intToPtr(partitionSpec.LastAssignedFieldID()), + LastUpdatedMS: time.Now().UnixMilli(), + }, + } + + metadata.preValidate() + + err := metadata.validate() + if err != nil { + return nil, fmt.Errorf("invalid metadata: %w", err) + } + + return metadata, nil +} diff --git a/table/table.go b/table/table.go index 80b68b39..243e85b7 100644 --- a/table/table.go +++ b/table/table.go @@ -22,6 +22,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" + "github.com/google/uuid" "golang.org/x/exp/slices" ) @@ -95,3 +96,29 @@ func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, } return New(ident, meta, metalocation, fsys), nil } + +func NewTable(ident Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder SortOrder, location string, properties iceberg.Properties) (*Table, error) { + if properties == nil { + properties = make(iceberg.Properties) + } + + tableUUID := uuid.New() + + // TODO: we need to "freshen" the sequences in the schema, partition spec, and sort order + + metadata, err := NewMetadataV2(schema, partitionSpec, sortOrder, location, tableUUID, properties) + if err != nil { + return nil, err + } + + return &Table{ + identifier: ident, + metadata: metadata, + metadataLocation: location, + fs: nil, + }, nil +} + +func intToPtr(i int) *int { + return &i +} diff --git a/table/table_test.go b/table/table_test.go index cde94ab1..80d068be 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -19,6 +19,8 @@ package table_test import ( "bytes" + "encoding/json" + "fmt" "testing" "github.com/apache/iceberg-go" @@ -128,3 +130,29 @@ func (t *TableTestSuite) TestSnapshotByName() { t.True(testSnapshot.Equals(*t.tbl.SnapshotByName("test"))) } + +func (t *TableTestSuite) TestNewTable() { + + identifier := table.Identifier{"records"} + + schema := iceberg.NewSchemaWithIdentifiers(1, []int{1}, + iceberg.NestedField{ + ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz}) + + partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) + + location := "s3://bucket/test/location" + + tbl, err := table.NewTable(identifier, schema, partSpec, table.UnsortedSortOrder, location, nil) + t.Require().NoError(err) + t.Require().Equal(identifier, tbl.Identifier()) + + data, err := json.Marshal(tbl.Metadata()) + t.Require().NoError(err) + fmt.Println(string(data)) +}