Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental migration support for POSIX #441

Merged
merged 16 commits into from
Jan 14, 2025
66 changes: 66 additions & 0 deletions api/layout/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package layout

import (
"fmt"
"iter"
"math"
"strconv"
"strings"
Expand All @@ -40,6 +41,71 @@ func EntriesPathForLogIndex(seq, logSize uint64) string {
return EntriesPath(seq/EntryBundleWidth, PartialTileSize(0, seq, logSize))
}

// Range returns an iterator over a list of RangeInfo structs which describe the bundles/tiles
// necessary to cover the specified range of individual entries/hashes `[from, min(from+N, treeSize) )`.
//
// If from >= treeSize or N == 0, the returned iterator will yield no elements.
func Range(from, N, treeSize uint64) iter.Seq[RangeInfo] {
return func(yield func(RangeInfo) bool) {
// Range is empty if we're entirely beyond the extent of the tree, or we've been asked for zero items.
if from >= treeSize || N == 0 {
return
}
// Truncate range at size of tree if necessary.
if from+N > treeSize {
N = treeSize - from
}

endInc := from + N - 1
sIndex := from / EntryBundleWidth
eIndex := endInc / EntryBundleWidth

for idx := sIndex; idx <= eIndex; idx++ {
ri := RangeInfo{
Index: idx,
N: EntryBundleWidth,
}

switch ri.Index {
case sIndex:
ri.Partial = PartialTileSize(0, sIndex, treeSize)
ri.First = uint(from % EntryBundleWidth)
ri.N = uint(EntryBundleWidth) - ri.First

// Handle corner-case where the range is entirely contained in first bundle, if applicable:
if ri.Index == eIndex {
ri.N = uint((endInc)%EntryBundleWidth) - ri.First + 1
}
case eIndex:
ri.Partial = PartialTileSize(0, eIndex, treeSize)
ri.N = uint((endInc)%EntryBundleWidth) + 1
}

if !yield(ri) {
return
}
}
}
}

// RangeInfo describes a specific range of elements within a particular bundle/tile.
//
// Usage:
//
// bundleRaw, ... := fetchBundle(..., ri.Index, ri.Partial")
// bundle, ... := parseBundle(bundleRaw)
// elements := bundle.Entries[ri.First : ri.First+ri.N]
type RangeInfo struct {
// Index is the index of the entry bundle/tile in the tree.
Index uint64
// Partial is the partial size of the bundle/tile, or zero if a full bundle/tile is expected.
Partial uint8
// First is the offset into the entries contained by the bundle/tile at which the range starts.
First uint
// N is the number of entries, starting at First, which are covered by the range.
N uint
}

// NWithSuffix returns a tiles-spec "N" path, with a partial suffix if p > 0.
func NWithSuffix(l, n uint64, p uint8) string {
suffix := ""
Expand Down
88 changes: 88 additions & 0 deletions api/layout/paths_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package layout
import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestEntriesPathForLogIndex(t *testing.T) {
Expand Down Expand Up @@ -306,3 +308,89 @@ func TestParseTileLevelIndexPartial(t *testing.T) {
})
}
}

func TestRange(t *testing.T) {
for _, test := range []struct {
from, N, treeSize uint64
desc string
want []RangeInfo
}{
{
desc: "from beyond extent",
from: 10,
N: 1,
treeSize: 5,
want: []RangeInfo{},
}, {
desc: "range end beyond extent",
from: 3,
N: 100,
treeSize: 5,
want: []RangeInfo{{Index: 0, First: 3, N: 5 - 3, Partial: 5}},
}, {
desc: "empty range",
from: 1,
N: 0,
treeSize: 2,
want: []RangeInfo{},
}, {
desc: "ok: full first bundle",
from: 0,
N: 256,
treeSize: 257,
want: []RangeInfo{{N: 256}},
}, {
desc: "ok: entire single (partial) bundle",
from: 20,
N: 90,
treeSize: 111,
want: []RangeInfo{{Index: 0, Partial: 111, First: 20, N: 90}},
}, {
desc: "ok: slice from single bundle with initial offset",
from: 20,
N: 90,
treeSize: 1 << 20,
want: []RangeInfo{{Index: 0, Partial: 0, First: 20, N: 90}},
}, {
desc: "ok: multiple bundles, first is full, last is truncated",
from: 0,
N: 4*256 + 42,
treeSize: 1 << 20,
want: []RangeInfo{
{Index: 0, Partial: 0, First: 0, N: 256},
{Index: 1, Partial: 0, First: 0, N: 256},
{Index: 2, Partial: 0, First: 0, N: 256},
{Index: 3, Partial: 0, First: 0, N: 256},
{Index: 4, Partial: 0, First: 0, N: 42},
},
}, {
desc: "ok: multiple bundles, first is offset, last is truncated",
from: 2,
N: 4*256 + 4,
treeSize: 1 << 20,
want: []RangeInfo{
{Index: 0, Partial: 0, First: 2, N: 256 - 2},
{Index: 1, Partial: 0, First: 0, N: 256},
{Index: 2, Partial: 0, First: 0, N: 256},
{Index: 3, Partial: 0, First: 0, N: 256},
{Index: 4, Partial: 0, First: 0, N: 6},
},
}, {
desc: "ok: offset and trucated from single bundle in middle of tree",
from: 8*256 + 66,
N: 4,
treeSize: 1 << 20,
want: []RangeInfo{{Index: 8, Partial: 0, First: 66, N: 4}},
},
} {
t.Run(test.desc, func(t *testing.T) {
i := 0
for gotInfo := range Range(test.from, test.N, test.treeSize) {
if d := cmp.Diff(test.want[i], gotInfo); d != "" {
t.Fatalf("got results[%d] with diff:\n%s", i, d)
}
i++
}
})
}
}
200 changes: 200 additions & 0 deletions cmd/experimental/migrate/internal/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2024 The Tessera authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"bytes"
"context"
"fmt"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/client"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
)

// copier controls the migration work.
type copier struct {
// storage is the target we're migrating to.
storage MigrationStorage
getEntries client.EntryBundleFetcherFunc

// sourceSize is the size of the source log.
sourceSize uint64
// sourceRoot is the root hash of the source log at sourceSize.
sourceRoot []byte

// todo contains work items to be completed.
todo chan bundle

// bundlesMigrated is the number of entry bundles migrated so far.
bundlesMigrated atomic.Uint64
}

// bundle represents the address of an individual entry bundle.
type bundle struct {
Index uint64
Partial uint8
}

// MigrationStorage describes the required functionality from the target storage driver.
//
// It's expected that the implementation of this interface will attempt to integrate the entry bundles
// being set as soon as is reasonably possible. It's up to the implementation whether that's done as a
// background task, or is in-line with the call to AwaitIntegration.
type MigrationStorage interface {
// SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates.
//
// Implementations SHOULD treat calls to this function as being idempotent - i.e. attempts to set a previously
// set entry bundle should succeed if the bundle data are identical.
//
// This will be called as many times as necessary to set all entry bundles being migrated, quite likely in parallel.
SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error
AlCutter marked this conversation as resolved.
Show resolved Hide resolved
// AwaitIntegration should block until the storage driver has received and integrated all outstanding entry bundles implied by sourceSize,
// and return the locally calculated root hash.
// An error should be returned if there is a problem integrating.
AwaitIntegration(ctx context.Context, sourceSize uint64) ([]byte, error)
// State returns the current integrated size and root hash of the local tree.
State(ctx context.Context) (uint64, []byte, error)
}

// Migrate starts the work of copying sourceSize entries from the source to the target log.
//
// Only the entry bundles are copied as the target storage is expected to integrate them and recalculate the root.
// This is done to ensure the correctness of both the source log as well as the copy process itself.
//
// A call to this function will block until either the copying is done, or an error has occurred.
// It is an error if the resource copying completes ok but the resulting root hash does not match the provided sourceRoot.
func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage MigrationStorage) error {
klog.Infof("Starting migration; source size %d root %x", sourceSize, sourceRoot)

m := &copier{
storage: storage,
sourceSize: sourceSize,
sourceRoot: sourceRoot,
getEntries: getEntries,
todo: make(chan bundle, numWorkers),
}

// init
targetSize, targetRoot, err := m.storage.State(ctx)
if err != nil {
return fmt.Errorf("Size: %v", err)
}
if targetSize > sourceSize {
return fmt.Errorf("Target size %d > source size %d", targetSize, sourceSize)
}
if targetSize == sourceSize {
if !bytes.Equal(targetRoot, sourceRoot) {
return fmt.Errorf("migration completed, but local root hash %x != source root hash %x", targetRoot, sourceRoot)
}
return nil
AlCutter marked this conversation as resolved.
Show resolved Hide resolved
}

bundlesToMigrate := (sourceSize / layout.EntryBundleWidth) - (targetSize / layout.EntryBundleWidth) + 1
go m.populateWork(targetSize, sourceSize)

// Print stats
go func() {
for {
time.Sleep(time.Second)
bn := m.bundlesMigrated.Load()
bnp := float64(bn*100) / float64(bundlesToMigrate)
s, _, err := m.storage.State(ctx)
if err != nil {
klog.Warningf("Size: %v", err)
}
intp := float64(s*100) / float64(sourceSize)
klog.Infof("integration: %d (%.2f%%) bundles: %d (%.2f%%)", s, intp, bn, bnp)
}
}()

// Do the copying
eg := errgroup.Group{}
for i := 0; i < numWorkers; i++ {
eg.Go(func() error {
return m.migrateWorker(ctx)
})
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("migrate failed to copy resources: %v", err)
}

root, err := m.storage.AwaitIntegration(ctx, sourceSize)
if err != nil {
return fmt.Errorf("Migration failed: %v", err)
}
if !bytes.Equal(root, sourceRoot) {
return fmt.Errorf("migration completed, but local root hash %x != source root hash %x", targetRoot, sourceRoot)
}
klog.Infof("Migration successful.")
return nil
}

// populateWork sends entries to the `todo` work channel.
// Each entry corresponds to an individual entryBundle which needs to be copied.
func (m *copier) populateWork(from, treeSize uint64) {
klog.Infof("Spans for entry range [%d, %d)", from, treeSize)
defer close(m.todo)

for ri := range layout.Range(from, treeSize-from, treeSize) {
m.todo <- bundle{Index: ri.Index, Partial: ri.Partial}
}
}

// migrateWorker undertakes work items from the `todo` channel.
//
// It will attempt to retry failed operations several times before giving up, this should help
// deal with any transient errors which may occur.
func (m *copier) migrateWorker(ctx context.Context) error {
for b := range m.todo {
err := retry.Do(func() error {
d, err := m.getEntries(ctx, b.Index, uint8(b.Partial))
if err != nil {
return fmt.Errorf("failed to fetch entrybundle %d (p=%d): %v", b.Index, b.Partial, err)
}
if err := m.storage.SetEntryBundle(ctx, b.Index, b.Partial, d); err != nil {
return fmt.Errorf("failed to store entrybundle %d (p=%d): %v", b.Index, b.Partial, err)
}
m.bundlesMigrated.Add(1)
return nil
},
retry.Attempts(10),
retry.DelayType(retry.BackOffDelay))
if err != nil {
return err
}
}
return nil
}

// BundleHasher parses a C2SP tlog-tile bundle and returns the leaf hashes of each entry it contains.
func BundleHasher(bundle []byte) ([][]byte, error) {
eb := &api.EntryBundle{}
if err := eb.UnmarshalText(bundle); err != nil {
return nil, fmt.Errorf("unmarshal: %v", err)
}
r := make([][]byte, 0, len(eb.Entries))
for _, e := range eb.Entries {
h := rfc6962.DefaultHasher.HashLeaf(e)
r = append(r, h[:])
}
return r, nil
}
Loading
Loading