-
Notifications
You must be signed in to change notification settings - Fork 381
/
Copy pathtree_storage.go
434 lines (386 loc) · 12.8 KB
/
tree_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
// Copyright 2018 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cloudspanner
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
"cloud.google.com/go/spanner"
"github.com/google/trillian"
"github.com/google/trillian/storage"
"github.com/google/trillian/storage/cache"
"github.com/google/trillian/storage/cloudspanner/spannerpb"
"github.com/google/trillian/storage/storagepb"
"github.com/google/trillian/storage/tree"
"github.com/transparency-dev/merkle/compact"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"k8s.io/klog/v2"
)
var (
// ErrNotFound is returned when a read/lookup fails because there was no such
// item.
ErrNotFound = status.Errorf(codes.NotFound, "not found")
// ErrNotImplemented is returned by any interface methods which have not been
// implemented yet.
ErrNotImplemented = errors.New("not implemented")
// ErrTransactionClosed is returned by interface methods when an operation is
// attempted on a transaction whose Commit or Close methods have
// previously been called.
ErrTransactionClosed = errors.New("transaction is closed")
// ErrWrongTXType is returned when, somehow, a write operation is attempted
// with a read-only transaction. This should not even be possible.
ErrWrongTXType = errors.New("mutating method called on read-only transaction")
)
const (
subtreeTbl = "SubtreeData"
colSubtree = "Subtree"
colSubtreeID = "SubtreeID"
colTreeID = "TreeID"
colRevision = "Revision"
)
// treeStorage provides a shared base for the concrete CloudSpanner-backed
// implementation of the Trillian storage.LogStorage and storage.MapStorage
// interfaces.
type treeStorage struct {
admin storage.AdminStorage
opts TreeStorageOptions
client *spanner.Client
}
// TreeStorageOptions holds various levers for configuring the tree storage instance.
type TreeStorageOptions struct {
// ReadOnlyStaleness controls how far in the past a read-only snapshot
// transaction will read.
// This is intended to allow Spanner to use local replicas for read requests
// to help with performance.
// See https://cloud.google.com/spanner/docs/timestamp-bounds for more details.
ReadOnlyStaleness time.Duration
}
func newTreeStorageWithOpts(client *spanner.Client, opts TreeStorageOptions) *treeStorage {
return &treeStorage{client: client, admin: nil, opts: opts}
}
type spanRead interface {
Query(context.Context, spanner.Statement) *spanner.RowIterator
Read(ctx context.Context, table string, keys spanner.KeySet, columns []string) *spanner.RowIterator
ReadUsingIndex(ctx context.Context, table, index string, keys spanner.KeySet, columns []string) *spanner.RowIterator
ReadRow(ctx context.Context, table string, key spanner.Key, columns []string) (*spanner.Row, error)
ReadWithOptions(ctx context.Context, table string, keys spanner.KeySet, columns []string, opts *spanner.ReadOptions) (ri *spanner.RowIterator)
}
// latestSTH reads and returns the newest STH.
func (t *treeStorage) latestSTH(ctx context.Context, stx spanRead, treeID int64) (*spannerpb.TreeHead, error) {
query := spanner.NewStatement(
"SELECT TreeID, TimestampNanos, TreeSize, RootHash, RootSignature, TreeRevision, TreeMetadata FROM TreeHeads" +
" WHERE TreeID = @tree_id" +
" ORDER BY TreeRevision DESC " +
" LIMIT 1")
query.Params["tree_id"] = treeID
var th *spannerpb.TreeHead
rows := stx.Query(ctx, query)
defer rows.Stop()
err := rows.Do(func(r *spanner.Row) error {
tth := &spannerpb.TreeHead{}
if err := r.Columns(&tth.TreeId, &tth.TsNanos, &tth.TreeSize, &tth.RootHash, &tth.Signature, &tth.TreeRevision, &tth.Metadata); err != nil {
return err
}
th = tth
return nil
})
if err != nil {
return nil, err
}
if th == nil {
klog.Warningf("no head found for treeID %v", treeID)
return nil, storage.ErrTreeNeedsInit
}
return th, nil
}
type newCacheFn func(*trillian.Tree) (*cache.SubtreeCache, error)
func (t *treeStorage) getTreeAndConfig(ctx context.Context, tree *trillian.Tree) (*trillian.Tree, proto.Message, error) {
config, err := unmarshalSettings(tree)
if err != nil {
return nil, nil, err
}
return tree, config, nil
}
// begin returns a newly started tree transaction for the specified tree.
func (t *treeStorage) begin(ctx context.Context, tree *trillian.Tree, newCache newCacheFn, stx spanRead) (*treeTX, error) {
tree, config, err := t.getTreeAndConfig(ctx, tree)
if err != nil {
return nil, err
}
subtreeCache, err := newCache(tree)
if err != nil {
return nil, err
}
treeTX := &treeTX{
treeID: tree.TreeId,
treeType: tree.TreeType,
ts: t,
stx: stx,
cache: subtreeCache,
config: config,
_writeRev: -1,
}
return treeTX, nil
}
// getLatestRoot populates this TX with the newest tree root visible (when
// taking read-staleness into account) by this transaction.
func (t *treeTX) getLatestRoot(ctx context.Context) error {
t.getLatestRootOnce.Do(func() {
t._currentSTH, t._currentSTHErr = t.ts.latestSTH(ctx, t.stx, t.treeID)
if t._currentSTH != nil {
t._writeRev = t._currentSTH.TreeRevision + 1
}
})
return t._currentSTHErr
}
// treeTX is a concrete implementation of the part of storage.LogTreeTX
// interface formerly known as storage.TreeTX.
type treeTX struct {
treeID int64
treeType trillian.TreeType
ts *treeStorage
// mu guards the nil setting/checking of stx as part of the open checking.
mu sync.RWMutex
// stx is the underlying Spanner transaction in which all operations will be
// performed.
stx spanRead
// config holds the StorageSettings proto acquired from the trillian.Tree.
// Varies according to tree_type (LogStorageConfig vs MapStorageConfig).
config proto.Message
// currentSTH holds a copy of the latest known STH at the time the
// transaction was started, or nil if there was no STH.
_currentSTH *spannerpb.TreeHead
_currentSTHErr error
// writeRev is the tree revision at which any writes will be made.
_writeRev int64
cache *cache.SubtreeCache
getLatestRootOnce sync.Once
}
func (t *treeTX) currentSTH(ctx context.Context) (*spannerpb.TreeHead, error) {
if err := t.getLatestRoot(ctx); err != nil {
return nil, err
}
return t._currentSTH, nil
}
func (t *treeTX) writeRev(ctx context.Context) (int64, error) {
if err := t.getLatestRoot(ctx); err == storage.ErrTreeNeedsInit {
return 0, nil
} else if err != nil {
return -1, fmt.Errorf("writeRev(): %v", err)
}
return t._writeRev, nil
}
// storeSubtrees adds buffered writes to the in-flight transaction to store the
// passed in subtrees.
func (t *treeTX) storeSubtrees(ctx context.Context, sts []*storagepb.SubtreeProto) error {
stx, ok := t.stx.(*spanner.ReadWriteTransaction)
if !ok {
return ErrWrongTXType
}
for _, st := range sts {
if st == nil {
continue
}
stBytes, err := proto.Marshal(st)
if err != nil {
return err
}
m := spanner.Insert(
subtreeTbl,
[]string{colTreeID, colSubtreeID, colRevision, colSubtree},
[]interface{}{t.treeID, st.Prefix, t._writeRev, stBytes},
)
if err := stx.BufferWrite([]*spanner.Mutation{m}); err != nil {
return err
}
}
return nil
}
func (t *treeTX) flushSubtrees(ctx context.Context) error {
tiles, err := t.cache.UpdatedTiles()
if err != nil {
return err
}
return t.storeSubtrees(ctx, tiles)
}
// Commit attempts to apply all actions perfomed to the underlying Spanner
// transaction. If this call returns an error, any values READ via this
// transaction MUST NOT be used.
// On return from the call, this transaction will be in a closed state.
func (t *treeTX) Commit(ctx context.Context) error {
t.mu.Lock()
defer func() {
t.stx = nil
t.mu.Unlock()
}()
if t.stx == nil {
return ErrTransactionClosed
}
switch stx := t.stx.(type) {
case *spanner.ReadOnlyTransaction:
klog.V(1).Infof("Closed readonly tx %p", stx)
stx.Close()
return nil
case *spanner.ReadWriteTransaction:
return t.flushSubtrees(ctx)
default:
return fmt.Errorf("internal error: unknown transaction type %T", stx)
}
}
// Close aborts any operations perfomed on the underlying Spanner transaction.
// On return from the call, this transaction will be in a closed state.
func (t *treeTX) Close() error {
t.mu.Lock()
defer t.mu.Unlock()
if t.stx == nil {
return ErrTransactionClosed
}
if stx, ok := t.stx.(*spanner.ReadOnlyTransaction); ok {
klog.V(1).Infof("Closed snapshot %p", stx)
stx.Close()
}
return nil
}
// readRevision returns the tree revision at which the currently visible (taking
// into account read-staleness) STH was stored.
func (t *treeTX) readRevision(ctx context.Context) (int64, error) {
sth, err := t.currentSTH(ctx)
if err != nil {
return -1, err
}
return sth.TreeRevision, nil
}
// getSubtree retrieves the most recent subtree specified by id at (or below)
// the requested revision.
// If no such subtree exists it returns nil.
func (t *treeTX) getSubtree(ctx context.Context, rev int64, id []byte) (p *storagepb.SubtreeProto, e error) {
var ret *storagepb.SubtreeProto
stmt := spanner.NewStatement(
"SELECT Revision, Subtree FROM SubtreeData" +
" WHERE TreeID = @tree_id" +
" AND SubtreeID = @subtree_id" +
" AND Revision <= @revision" +
" ORDER BY Revision DESC" +
" LIMIT 1")
stmt.Params["tree_id"] = t.treeID
stmt.Params["subtree_id"] = id
stmt.Params["revision"] = rev
rows := t.stx.Query(ctx, stmt)
err := rows.Do(func(r *spanner.Row) error {
if ret != nil {
return nil
}
var rRev int64
var st storagepb.SubtreeProto
stBytes := make([]byte, 1<<20)
if err := r.Columns(&rRev, &stBytes); err != nil {
return err
}
if err := proto.Unmarshal(stBytes, &st); err != nil {
return err
}
if rRev > rev {
return fmt.Errorf("got subtree with too new a revision %d, want %d", rRev, rev)
}
if got, want := id, st.Prefix; !bytes.Equal(got, want) {
return fmt.Errorf("got subtree with prefix %v, wanted %v", got, want)
}
if got, want := rRev, rev; got > rev {
return fmt.Errorf("got subtree rev %d, wanted <= %d", got, want)
}
ret = &st
// If this is a subtree with a zero-length prefix, we'll need to create an
// empty Prefix field:
if st.Prefix == nil && len(id) == 0 {
st.Prefix = []byte{}
}
return nil
})
return ret, err
}
// GetMerkleNodes returns the requested set of nodes at, or before, the
// transaction read revision.
func (t *treeTX) GetMerkleNodes(ctx context.Context, ids []compact.NodeID) ([]tree.Node, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.stx == nil {
return nil, ErrTransactionClosed
}
rev, err := t.readRevision(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get read revision: %v", err)
}
return t.cache.GetNodes(ids, t.getSubtreesAtRev(ctx, rev))
}
// getSubtreesAtRev returns a GetSubtreesFunc which reads at the passed in rev.
func (t *treeTX) getSubtreesAtRev(ctx context.Context, rev int64) cache.GetSubtreesFunc {
return func(ids [][]byte) ([]*storagepb.SubtreeProto, error) {
// Request the various subtrees in parallel.
// c will carry any retrieved subtrees
c := make(chan *storagepb.SubtreeProto, len(ids))
// Spawn goroutines for each request
g, gctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id
g.Go(func() error {
st, err := t.getSubtree(gctx, rev, id)
if err != nil {
return err
}
c <- st
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
close(c)
// Now wait for the goroutines to signal their completion, and collect
// the results.
ret := make([]*storagepb.SubtreeProto, 0, len(ids))
for st := range c {
if st != nil {
ret = append(ret, st)
}
}
return ret, nil
}
}
// SetMerkleNodes stores the provided merkle nodes at the writeRevision of the
// transaction.
func (t *treeTX) SetMerkleNodes(ctx context.Context, nodes []tree.Node) error {
t.mu.RLock()
defer t.mu.RUnlock()
if t.stx == nil {
return ErrTransactionClosed
}
writeRev, err := t.writeRev(ctx)
if err != nil {
return err
}
return t.cache.SetNodes(nodes, t.getSubtreesAtRev(ctx, writeRev-1))
}
func checkDatabaseAccessible(ctx context.Context, client *spanner.Client) error {
stmt := spanner.NewStatement("SELECT 1")
// We don't care about freshness here, being able to read *something* is enough
rows := client.Single().Query(ctx, stmt)
defer rows.Stop()
return rows.Do(func(row *spanner.Row) error { return nil })
}