Skip to content

Commit

Permalink
feat(table): Update Snapshot Summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Mar 1, 2025
1 parent f791776 commit 5853b6b
Show file tree
Hide file tree
Showing 2 changed files with 457 additions and 4 deletions.
270 changes: 266 additions & 4 deletions table/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"errors"
"fmt"
"maps"
"slices"
"strconv"
"strings"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
iceio "github.com/apache/iceberg-go/io"
)

type Operation string
Expand All @@ -52,14 +54,127 @@ func ValidOperation(s string) (Operation, error) {
return "", fmt.Errorf("%w: found '%s'", ErrInvalidOperation, s)
}

const operationKey = "operation"
const (
operationKey = "operation"

addedDataFilesKey = "added-data-files"

Check failure on line 60 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

const addedDataFilesKey is unused (U1000)

Check failure on line 60 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

const addedDataFilesKey is unused (U1000)

Check failure on line 60 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

const addedDataFilesKey is unused (U1000)
addedDeleteFilesKey = "added-delete-files"

Check failure on line 61 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

const addedDeleteFilesKey is unused (U1000)

Check failure on line 61 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

const addedDeleteFilesKey is unused (U1000)

Check failure on line 61 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

const addedDeleteFilesKey is unused (U1000)
addedEqDeletesKey = "added-equality-deletes"

Check failure on line 62 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

const addedEqDeletesKey is unused (U1000)

Check failure on line 62 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

const addedEqDeletesKey is unused (U1000)

Check failure on line 62 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

const addedEqDeletesKey is unused (U1000)
addedFileSizeKey = "added-files-size"

Check failure on line 63 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

const addedFileSizeKey is unused (U1000)

Check failure on line 63 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

const addedFileSizeKey is unused (U1000)

Check failure on line 63 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

const addedFileSizeKey is unused (U1000)
addedPosDeletesKey = "added-position-deletes"

Check failure on line 64 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

const addedPosDeletesKey is unused (U1000)

Check failure on line 64 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

const addedPosDeletesKey is unused (U1000)

Check failure on line 64 in table/snapshots.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

const addedPosDeletesKey is unused (U1000)
addedPosDeleteFilesKey = "added-position-delete-files"
addedRecordsKey = "added-records"
addedEqDeleteFilesKey = "added-equality-delete-files"
deletedDataFilesKey = "deleted-data-files"
deletedRecordsKey = "deleted-records"
removedDeleteFilesKey = "removed-delete-files"
removedEqDeletesKey = "removed-equality-deletes"
removedEqDeleteFilesKey = "removed-equality-delete-files"
removedFileSizeKey = "removed-files-size"
removedPosDeletesKey = "removed-position-deletes"
removedPosDeleteFilesKey = "removed-position-delete-files"
totalEqDeletesKey = "total-equality-deletes"
totalPosDeletesKey = "total-position-deletes"
totalDataFilesKey = "total-data-files"
totalDeleteFilesKey = "total-delete-files"
totalRecordsKey = "total-records"
totalFileSizeKey = "total-files-size"
changedPartitionCountProp = "changed-partition-count"
changedPartitionPrefix = "partitions."
)

type updateMetrics struct {
addedFileSize int64
removedFileSize int64
addedDataFiles int64
removedDataFiles int64
addedEqDeleteFiles int64
removedEqDeleteFiles int64
addedPosDeleteFiles int64
removedPosDeleteFiles int64
addedDeleteFiles int64
removedDeleteFiles int64
addedRecords int64
deletedRecords int64
addedPosDeletes int64
removedPosDeletes int64
addedEqDeletes int64
removedEqDeletes int64
}

func (m *updateMetrics) addFile(df iceberg.DataFile) error {
m.addedFileSize += df.FileSizeBytes()
switch df.ContentType() {
case iceberg.EntryContentData:
m.addedDataFiles++
m.addedRecords += df.Count()
case iceberg.EntryContentPosDeletes:
m.addedDeleteFiles++
m.addedPosDeleteFiles++
m.addedPosDeletes += df.Count()
case iceberg.EntryContentEqDeletes:
m.addedDeleteFiles++
m.addedEqDeleteFiles++
m.addedEqDeletes += df.Count()
default:
return fmt.Errorf("unknown data file content: %s", df.ContentType())
}
return nil
}

func (m *updateMetrics) removeFile(df iceberg.DataFile) error {
m.removedFileSize += df.FileSizeBytes()
switch df.ContentType() {
case iceberg.EntryContentData:
m.removedDataFiles++
m.deletedRecords += df.Count()
case iceberg.EntryContentPosDeletes:
m.removedDeleteFiles++
m.removedPosDeleteFiles++
m.removedPosDeletes += df.Count()
case iceberg.EntryContentEqDeletes:
m.removedDeleteFiles++
m.removedEqDeleteFiles++
m.removedEqDeletes += df.Count()
default:
return fmt.Errorf("unknown data file content: %s", df.ContentType())
}
return nil
}

func setWhenPositive(props iceberg.Properties, key string, value int64) {
if value > 0 {
props[key] = strconv.FormatInt(value, 10)
}
}

func (m *updateMetrics) toProps() iceberg.Properties {
props := iceberg.Properties{}
setWhenPositive(props, addedFileSizeKey, m.addedFileSize)
setWhenPositive(props, removedFileSizeKey, m.removedFileSize)
setWhenPositive(props, addedDataFilesKey, m.addedDataFiles)
setWhenPositive(props, deletedDataFilesKey, m.removedDataFiles)
setWhenPositive(props, addedEqDeleteFilesKey, m.addedEqDeleteFiles)
setWhenPositive(props, removedEqDeleteFilesKey, m.removedEqDeleteFiles)
setWhenPositive(props, addedPosDeleteFilesKey, m.addedPosDeleteFiles)
setWhenPositive(props, removedPosDeleteFilesKey, m.removedPosDeleteFiles)
setWhenPositive(props, addedDeleteFilesKey, m.addedDeleteFiles)
setWhenPositive(props, removedDeleteFilesKey, m.removedDeleteFiles)
setWhenPositive(props, addedRecordsKey, m.addedRecords)
setWhenPositive(props, deletedRecordsKey, m.deletedRecords)
setWhenPositive(props, addedPosDeletesKey, m.addedPosDeletes)
setWhenPositive(props, removedPosDeletesKey, m.removedPosDeletes)
setWhenPositive(props, addedEqDeletesKey, m.addedEqDeletes)
setWhenPositive(props, removedEqDeletesKey, m.removedEqDeletes)
return props
}

// Summary stores the summary information for a snapshot indicating
// the operation that created the snapshot, and various properties
// which might exist in the summary.
type Summary struct {
Operation Operation
Properties map[string]string
Properties iceberg.Properties
}

func (s *Summary) String() string {
Expand Down Expand Up @@ -172,7 +287,7 @@ func (s Snapshot) Equals(other Snapshot) bool {
s.Summary.Equals(other.Summary)
}

func (s Snapshot) Manifests(fio io.IO) ([]iceberg.ManifestFile, error) {
func (s Snapshot) Manifests(fio iceio.IO) ([]iceberg.ManifestFile, error) {
if s.ManifestList != "" {
f, err := fio.Open(s.ManifestList)
if err != nil {
Expand All @@ -194,3 +309,150 @@ type SnapshotLogEntry struct {
SnapshotID int64 `json:"snapshot-id"`
TimestampMs int64 `json:"timestamp-ms"`
}

type snapshotSummaryCollector struct {
metrics updateMetrics
partitionMetrics map[string]updateMetrics
maxChangedPartitionsForSummaries int
}

func (s *snapshotSummaryCollector) setPartitionSummaryLimit(limit int) {
s.maxChangedPartitionsForSummaries = limit
}

func (s *snapshotSummaryCollector) updatePartitionMetrics(spec iceberg.PartitionSpec, df iceberg.DataFile, isAddFile bool, sc *iceberg.Schema) error {
partitionPath := spec.PartitionToPath(getPartitionRecord(df, spec.PartitionType(sc)), sc)
if s.partitionMetrics == nil {
s.partitionMetrics = make(map[string]updateMetrics)
}

metrics := s.partitionMetrics[partitionPath]
if isAddFile {
if err := metrics.addFile(df); err != nil {
return err
}
} else {
if err := metrics.removeFile(df); err != nil {
return err
}
}
s.partitionMetrics[partitionPath] = metrics

return nil
}

func (s *snapshotSummaryCollector) addFile(df iceberg.DataFile, sc *iceberg.Schema, spec iceberg.PartitionSpec) error {
if err := s.metrics.addFile(df); err != nil {
return err
}

if len(df.Partition()) > 0 {
return s.updatePartitionMetrics(spec, df, true, sc)
}
return nil
}

func (s *snapshotSummaryCollector) removeFile(df iceberg.DataFile, sc *iceberg.Schema, spec iceberg.PartitionSpec) error {
if err := s.metrics.removeFile(df); err != nil {
return err
}

if len(df.Partition()) > 0 {
return s.updatePartitionMetrics(spec, df, false, sc)
}
return nil
}

func (s *snapshotSummaryCollector) partitionSummary(metrics *updateMetrics) string {
props := metrics.toProps()
return strings.Join(slices.Collect(func(yield func(s string) bool) {
for k, v := range props {
if !yield(fmt.Sprintf("%s=%s", k, v)) {
return
}
}
}), ",")
}

func (s *snapshotSummaryCollector) build() iceberg.Properties {
props := s.metrics.toProps()
changedPartitionsSize := len(s.partitionMetrics)
setWhenPositive(props, changedPartitionCountProp, int64(changedPartitionsSize))
if changedPartitionsSize <= s.maxChangedPartitionsForSummaries {
for partPath, updateMetricsPart := range s.partitionMetrics {
if summary := s.partitionSummary(&updateMetricsPart); len(summary) > 0 {
props[changedPartitionPrefix+partPath] = summary
}
}
}
return props
}

func truncateTableSummary(sum Summary, previous iceberg.Properties) Summary {
keys := []string{totalDataFilesKey, totalDeleteFilesKey, totalRecordsKey,
totalFileSizeKey, totalPosDeletesKey, totalEqDeletesKey}
for _, prop := range keys {
sum.Properties[prop] = "0"
}

updateProp := func(prop, prevProp string) {
if val := previous.GetInt(prevProp, 0); val > 0 {
sum.Properties[prop] = strconv.Itoa(val)
}
}

updateProp(deletedDataFilesKey, totalDataFilesKey)
updateProp(removedDeleteFilesKey, totalDeleteFilesKey)
updateProp(deletedRecordsKey, totalRecordsKey)
updateProp(removedFileSizeKey, totalFileSizeKey)
updateProp(removedPosDeletesKey, totalPosDeletesKey)
updateProp(removedEqDeletesKey, totalEqDeletesKey)

return sum
}

func updateSnapshotSummaries(sum Summary, previous iceberg.Properties, truncateFullTable bool) (Summary, error) {
switch sum.Operation {
case OpAppend, OpOverwrite, OpDelete:
default:
return sum, fmt.Errorf("%w: operation: %s", iceberg.ErrNotImplemented, sum.Operation)
}

if sum.Properties == nil {
sum.Properties = make(iceberg.Properties)
}

if truncateFullTable && sum.Operation == OpOverwrite && previous != nil {
sum = truncateTableSummary(sum, previous)
}

if previous == nil {
previous = iceberg.Properties{
totalDataFilesKey: "0",
totalDeleteFilesKey: "0",
totalRecordsKey: "0",
totalFileSizeKey: "0",
totalPosDeletesKey: "0",
totalEqDeletesKey: "0",
}
}

updateTotals := func(totalProp, addedProp, removedProp string) {
newTotal := previous.GetInt(totalProp, 0)
newTotal += sum.Properties.GetInt(addedProp, 0)
newTotal -= sum.Properties.GetInt(removedProp, 0)

if newTotal >= 0 {
sum.Properties[totalProp] = strconv.Itoa(newTotal)
}
}

updateTotals(totalDataFilesKey, addedDataFilesKey, deletedDataFilesKey)
updateTotals(totalDeleteFilesKey, addedDeleteFilesKey, removedDeleteFilesKey)
updateTotals(totalRecordsKey, addedRecordsKey, deletedRecordsKey)
updateTotals(totalFileSizeKey, addedFileSizeKey, removedFileSizeKey)
updateTotals(totalPosDeletesKey, addedPosDeletesKey, removedPosDeletesKey)
updateTotals(totalEqDeletesKey, addedEqDeletesKey, removedEqDeletesKey)

return sum, nil
}
Loading

0 comments on commit 5853b6b

Please sign in to comment.