Skip to content

Commit

Permalink
[GCP] Dedup storage experiment (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Dec 5, 2024
1 parent c6816de commit afd61ea
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 16 deletions.
14 changes: 13 additions & 1 deletion cmd/conformance/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
listen = flag.String("listen", ":2024", "Address:port to listen on")
spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')")
signer = flag.String("signer", "", "Note signer to use to sign checkpoints")
persistentDedup = flag.Bool("gcp_dedup", false, "EXPERIMENTAL: Set to true to enable persistent dedupe storage")
additionalSigners = []string{}
)

Expand Down Expand Up @@ -65,7 +66,18 @@ func main() {
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256)

// Handle dedup configuration
addDelegate := storage.Add

// PersistentDedup is currently experimental, so there's no terraform or documentation yet!
if *persistentDedup {
addDelegate, err = gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner), addDelegate)
if err != nil {
klog.Exitf("Failed to create new GCP dedupe: %v", err)
}
}
dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256)

// Expose a HTTP handler for the conformance test writes.
// This should accept arbitrary bytes POSTed to /add, and return an ascii
Expand Down
31 changes: 20 additions & 11 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package tessera

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/expirable"
lru "github.com/hashicorp/golang-lru/v2"
)

// InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying
Expand All @@ -35,30 +36,38 @@ import (
// InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to
// make calls to a persistent storage.
func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture {
c, err := lru.New[string, func() IndexFuture](int(size))
if err != nil {
panic(fmt.Errorf("lru.New(%d): %v", size, err))
}
dedupe := &inMemoryDedupe{
delegate: delegate,
cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0),
cache: c,
}
return dedupe.add
}

type inMemoryDedupe struct {
delegate func(ctx context.Context, e *Entry) IndexFuture
mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes
cache *expirable.LRU[string, IndexFuture]
cache *lru.Cache[string, func() IndexFuture]
}

// Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture {
id := string(e.Identity())
d.mu.Lock()
defer d.mu.Unlock()

f, ok := d.cache.Get(id)
if !ok {
f = d.delegate(ctx, e)
d.cache.Add(id, f)
// However many calls with the same entry come in and are deduped, we should only call delegate
// once for each unique entry:
f := sync.OnceValue(func() IndexFuture {
return d.delegate(ctx, e)
})

// if we've seen this entry before, discard our f and replace
// with the one we created last time, otherwise store f against id.
if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok {
f = prev
}
return f

return f()
}
10 changes: 6 additions & 4 deletions dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func TestDedupe(t *testing.T) {
dedupeAdd := tessera.InMemoryDedupe(delegate, 256)

// Add foo, bar, baz to prime the cache to make things interesting
dedupeAdd(ctx, tessera.NewEntry([]byte("foo")))
dedupeAdd(ctx, tessera.NewEntry([]byte("bar")))
dedupeAdd(ctx, tessera.NewEntry([]byte("baz")))
for _, s := range []string{"foo", "bar", "baz"} {
if _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(s)))(); err != nil {
t.Fatalf("dedupeAdd(%q): %v", s, err)
}
}

idx, err := dedupeAdd(ctx, tessera.NewEntry([]byte(tC.newValue)))()
if err != nil {
t.Fatal(err)
t.Fatalf("dedupeAdd(%q): %v", tC.newValue, err)
}
if idx != tC.wantIdx {
t.Errorf("got != want (%d != %d)", idx, tC.wantIdx)
Expand Down
169 changes: 169 additions & 0 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"io"
"net/http"
"os"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
"github.com/globocom/go-buffer"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
Expand Down Expand Up @@ -783,3 +785,170 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e
}
return r.Attrs.LastModified, r.Close()
}

// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of
// previously seen entries and their assigned indices. Future calls with the same entry
// will return the previously assigned index, as yet unseen entries will be passed to the provided
// delegate function to have an index assigned.
//
// For performance reasons, the ID -> index associations returned by the delegate are buffered before
// being flushed to Spanner. This can result in duplicates occuring in some circumstances, but in
// general this should not be a problem.
//
// Note that the storage for this mapping is entirely separate and unconnected to the storage used for
// maintaining the Merkle tree.
//
// This functionality is experimental!
func NewDedupe(ctx context.Context, spannerDB string, delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture) (func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture, error) {
/*
Schema for reference:
CREATE TABLE IDSeq (
id INT64 NOT NULL,
h BYTES(MAX) NOT NULL,
idx INT64 NOT NULL,
) PRIMARY KEY (id, h);
*/
dedupDB, err := spanner.NewClient(ctx, spannerDB)
if err != nil {
return nil, fmt.Errorf("failed to connect to Spanner: %v", err)
}

r := &dedupStorage{
ctx: ctx,
dbPool: dedupDB,
delegate: delegate,
}

// TODO(al): Make these configurable
r.buf = buffer.New(
buffer.WithSize(64),
buffer.WithFlushInterval(200*time.Millisecond),
buffer.WithFlusher(buffer.FlusherFunc(r.flush)),
buffer.WithPushTimeout(15*time.Second),
)
go func(ctx context.Context) {
t := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load())
}
}
}(ctx)
return r.add, nil
}

type dedupStorage struct {
ctx context.Context
dbPool *spanner.Client
delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture

numLookups atomic.Uint64
numWrites atomic.Uint64
numDBDedups atomic.Uint64
numPushErrs atomic.Uint64

buf *buffer.Buffer
}

// index returns the index (if any) previously associated with the provided hash
func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) {
d.numLookups.Add(1)
var idx int64
if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil {
if c := spanner.ErrCode(err); c == codes.NotFound {
return nil, nil
}
return nil, err
} else {
if err := row.Column(0, &idx); err != nil {
return nil, fmt.Errorf("failed to read dedup index: %v", err)
}
idx := uint64(idx)
d.numDBDedups.Add(1)
return &idx, nil
}
}

// storeMappings stores the associations between the keys and IDs in a non-atomic fashion
// (i.e. it does not store all or none in a transactional sense).
//
// Returns an error if one or more mappings cannot be stored.
func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error {
m := make([]*spanner.MutationGroup, 0, len(entries))
for _, e := range entries {
m = append(m, &spanner.MutationGroup{
Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})},
})
}

i := d.dbPool.BatchWrite(ctx, m)
return i.Do(func(r *spannerpb.BatchWriteResponse) error {
s := r.GetStatus()
if c := codes.Code(s.Code); c != codes.OK && c != codes.AlreadyExists {
return fmt.Errorf("failed to write dedup record: %v (%v)", s.GetMessage(), c)
}
return nil
})
}

// dedupeMapping represents an ID -> index mapping.
type dedupeMapping struct {
ID []byte
Idx uint64
}

// add adds the entry to the underlying delegate only if e isn't already known. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
idx, err := d.index(ctx, e.Identity())
if err != nil {
return func() (uint64, error) { return 0, err }
}
if idx != nil {
return func() (uint64, error) { return *idx, nil }
}

i, err := d.delegate(ctx, e)()
if err != nil {
return func() (uint64, error) { return 0, err }
}

err = d.enqueueMapping(ctx, e.Identity(), i)
return func() (uint64, error) {
return i, err
}
}

// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage.
func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error {
err := d.buf.Push(dedupeMapping{ID: h, Idx: idx})
if err != nil {
d.numPushErrs.Add(1)
// This means there's pressure flushing dedup writes out, so discard this write.
if err != buffer.ErrTimeout {
return err
}
}
return nil
}

// flush writes enqueued mappings to storage.
func (d *dedupStorage) flush(items []interface{}) {
entries := make([]dedupeMapping, len(items))
for i := range items {
entries[i] = items[i].(dedupeMapping)
}

ctx, c := context.WithTimeout(d.ctx, 15*time.Second)
defer c()

if err := d.storeMappings(ctx, entries); err != nil {
klog.Infof("Failed to flush dedup entries: %v", err)
return
}
d.numWrites.Add(uint64(len(entries)))
}

0 comments on commit afd61ea

Please sign in to comment.