Skip to content

Commit

Permalink
rename hive to hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour committed Apr 15, 2024
1 parent aab9c3b commit ce10fd5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
3 changes: 2 additions & 1 deletion catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"errors"
"net/url"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/polarsignals/iceberg-go"
"github.com/polarsignals/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
)

type CatalogType string
Expand All @@ -36,6 +36,7 @@ const (
Glue CatalogType = "glue"
DynamoDB CatalogType = "dynamodb"
SQL CatalogType = "sql"
Hadoop CatalogType = "hadoop"
)

var (
Expand Down
54 changes: 27 additions & 27 deletions catalog/hive.go → catalog/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,29 @@ var (
)

const (
hiveTableMetadataDir = "metadata"
hiveVersionHintFile = "version-hint.text"
hdfsTableMetadataDir = "metadata"
hdfsVersionHintFile = "version-hint.text"
)

func hiveMetadataFileName(version int) string {
func hdfsMetadataFileName(version int) string {
return fmt.Sprintf("v%d.metadata.json", version)
}

type hive struct {
type hdfs struct {
bucket objstore.Bucket
}

func NewHive(bucket objstore.Bucket) Catalog {
return &hive{bucket: bucket}
func NewHDFS(bucket objstore.Bucket) Catalog {
return &hdfs{bucket: bucket}
}

func (h *hive) CatalogType() CatalogType {
return Hive
func (h *hdfs) CatalogType() CatalogType {
return Hadoop
}

func (h *hive) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) {
func (h *hdfs) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) {
if len(namespace) != 1 {
return nil, fmt.Errorf("hive catalog only supports listing tables in a single namespace")
return nil, fmt.Errorf("hdfs catalog only supports listing tables in a single namespace")
}

ns := namespace[0]
Expand All @@ -52,7 +52,7 @@ func (h *hive) ListTables(ctx context.Context, namespace table.Identifier) ([]ta
return tables, nil
}

func (h *hive) DropTable(ctx context.Context, identifier table.Identifier) error {
func (h *hdfs) DropTable(ctx context.Context, identifier table.Identifier) error {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return err
Expand All @@ -62,35 +62,35 @@ func (h *hive) DropTable(ctx context.Context, identifier table.Identifier) error
return h.bucket.Delete(ctx, filepath.Join(ns, tbl))
}

func (h *hive) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("hive catalog does not support renaming tables")
func (h *hdfs) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("hdfs catalog does not support renaming tables")
}

func (h *hive) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) {
func (h *hdfs) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) {
namespaces := []table.Identifier{}
return namespaces, h.bucket.Iter(ctx, filepath.Join(parent...), func(name string) error {
namespaces = append(namespaces, table.Identifier{name})
return nil
})
}

func (h *hive) CreateNamespace(ctx context.Context, namespace table.Identifier, _ iceberg.Properties) error {
return fmt.Errorf("hive catalog does not support creating namespaces")
func (h *hdfs) CreateNamespace(ctx context.Context, namespace table.Identifier, _ iceberg.Properties) error {
return fmt.Errorf("hdfs catalog does not support creating namespaces")
}

func (h *hive) DropNamespace(ctx context.Context, namespace table.Identifier) error {
return fmt.Errorf("hive catalog does not support dropping namespaces")
func (h *hdfs) DropNamespace(ctx context.Context, namespace table.Identifier) error {
return fmt.Errorf("hdfs catalog does not support dropping namespaces")
}

func (h *hive) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
return nil, fmt.Errorf("hive catalog does not support loading namespace properties")
func (h *hdfs) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
return nil, fmt.Errorf("hdfs catalog does not support loading namespace properties")
}

func (h *hive) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) {
return PropertiesUpdateSummary{}, fmt.Errorf("hive catalog does not support updating namespace properties")
func (h *hdfs) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) {
return PropertiesUpdateSummary{}, fmt.Errorf("hdfs catalog does not support updating namespace properties")
}

func (h *hive) LoadTable(ctx context.Context, identifier table.Identifier, _ iceberg.Properties) (*table.Table, error) {
func (h *hdfs) LoadTable(ctx context.Context, identifier table.Identifier, _ iceberg.Properties) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
Expand All @@ -105,7 +105,7 @@ func (h *hive) LoadTable(ctx context.Context, identifier table.Identifier, _ ice
return t, nil
}

func (h *hive) loadLatestTable(ctx context.Context, identifier table.Identifier, ns, tbl string) (*table.Table, error) {
func (h *hdfs) loadLatestTable(ctx context.Context, identifier table.Identifier, ns, tbl string) (*table.Table, error) {
v, err := getTableVersion(ctx, h.bucket, ns, tbl)
if err != nil {
return nil, err
Expand All @@ -116,12 +116,12 @@ func (h *hive) loadLatestTable(ctx context.Context, identifier table.Identifier,
return nil, err
}

return table.New(identifier, md, filepath.Join(ns, tbl, hiveTableMetadataDir, hiveMetadataFileName(md.Version())), h.bucket), nil
return table.New(identifier, md, filepath.Join(ns, tbl, hdfsTableMetadataDir, hdfsMetadataFileName(md.Version())), h.bucket), nil
}

// getTableMetadata returns the metadata of the table at the specified version.
func getTableMetadata(ctx context.Context, bucket objstore.Bucket, ns, tbl string, version int) (table.Metadata, error) {
r, err := bucket.Get(ctx, filepath.Join(ns, tbl, hiveTableMetadataDir, hiveMetadataFileName(version)))
r, err := bucket.Get(ctx, filepath.Join(ns, tbl, hdfsTableMetadataDir, hdfsMetadataFileName(version)))
if err != nil {
return nil, fmt.Errorf("failed to get metadata file: %w", err)
}
Expand All @@ -138,7 +138,7 @@ func getTableMetadata(ctx context.Context, bucket objstore.Bucket, ns, tbl strin
// getTableVersion returns the latest version of the table.
// FIXME: this could fallback to a version file scan instead of returning an error
func getTableVersion(ctx context.Context, bucket objstore.Bucket, ns, tbl string) (int, error) {
r, err := bucket.Get(ctx, filepath.Join(ns, tbl, hiveTableMetadataDir, hiveVersionHintFile))
r, err := bucket.Get(ctx, filepath.Join(ns, tbl, hdfsTableMetadataDir, hdfsVersionHintFile))
if err != nil {
if bucket.IsObjNotFoundErr(err) { // Table does not exist.
return -1, ErrorTableNotFound
Expand Down

0 comments on commit ce10fd5

Please sign in to comment.