Skip to content

Commit

Permalink
Move the context within the TX. (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Jan 27, 2017
1 parent b25082f commit ff76ca6
Show file tree
Hide file tree
Showing 26 changed files with 594 additions and 621 deletions.
52 changes: 26 additions & 26 deletions log/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s Sequencer) buildMerkleTreeFromStorageAtRoot(ctx context.Context, root tr
glog.Warningf("%s: Failed to create nodeID: %v", util.LogIDPrefix(ctx), err)
return nil, err
}
nodes, err := tx.GetMerkleNodes(ctx, root.TreeRevision, []storage.NodeID{nodeID})
nodes, err := tx.GetMerkleNodes(root.TreeRevision, []storage.NodeID{nodeID})

if err != nil {
glog.Warningf("%s: Failed to get Merkle nodes: %s", util.LogIDPrefix(ctx), err)
Expand Down Expand Up @@ -175,20 +175,20 @@ func (s Sequencer) SequenceBatch(ctx context.Context, limit int) (int, error) {

// Very recent leaves inside the guard window will not be available for sequencing
guardCutoffTime := s.timeSource.Now().Add(-s.sequencerGuardWindow)
leaves, err := tx.DequeueLeaves(ctx, limit, guardCutoffTime)
leaves, err := tx.DequeueLeaves(limit, guardCutoffTime)

if err != nil {
glog.Warningf("%s: Sequencer failed to dequeue leaves: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

// Get the latest known root from storage
currentRoot, err := tx.LatestSignedLogRoot(ctx)
currentRoot, err := tx.LatestSignedLogRoot()

if err != nil {
glog.Warningf("%s: Sequencer failed to get latest root: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

Expand All @@ -203,13 +203,13 @@ func (s Sequencer) SequenceBatch(ctx context.Context, limit int) (int, error) {
// current one is too old. If there's work to be done then we'll be creating a root anyway.
if len(leaves) == 0 {
// We have nothing to integrate into the tree
return 0, tx.Commit(ctx)
return 0, tx.Commit()
}

merkleTree, err := s.initMerkleTreeFromStorage(ctx, currentRoot, tx)

if err != nil {
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

Expand All @@ -219,29 +219,29 @@ func (s Sequencer) SequenceBatch(ctx context.Context, limit int) (int, error) {
// number so it should not be possible for colliding updates to commit.
newVersion := tx.WriteRevision()
if got, want := newVersion, currentRoot.TreeRevision+int64(1); got != want {
tx.Rollback(ctx)
tx.Rollback()
return 0, fmt.Errorf("%s: got writeRevision of %d, but expected %d", util.LogIDPrefix(ctx), got, want)
}

// Assign leaf sequence numbers and collate node updates
nodeMap, sequencedLeaves, err := s.sequenceLeaves(merkleTree, leaves)
if err != nil {
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

// We should still have the same number of leaves
if want, got := len(leaves), len(sequencedLeaves); want != got {
tx.Rollback(ctx)
tx.Rollback()
return 0, fmt.Errorf("%s: wanted: %d leaves after sequencing but we got: %d", util.LogIDPrefix(ctx), want, got)
}

// Write the new sequence numbers to the leaves in the DB
err = tx.UpdateSequencedLeaves(ctx, sequencedLeaves)
err = tx.UpdateSequencedLeaves(sequencedLeaves)

if err != nil {
glog.Warningf("%s: Sequencer failed to update sequenced leaves: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

Expand All @@ -253,16 +253,16 @@ func (s Sequencer) SequenceBatch(ctx context.Context, limit int) (int, error) {
if err != nil {
// probably an internal error with map building, unexpected
glog.Warningf("%s: Failed to build target nodes in sequencer: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

// Now insert or update the nodes affected by the above, at the new tree version
err = tx.SetMerkleNodes(ctx, targetNodes)
err = tx.SetMerkleNodes(targetNodes)

if err != nil {
glog.Warningf("%s: Sequencer failed to set Merkle nodes: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

Expand All @@ -280,22 +280,22 @@ func (s Sequencer) SequenceBatch(ctx context.Context, limit int) (int, error) {

if err != nil {
glog.Warningf("%s: signer failed to sign root: %v", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

newLogRoot.Signature = &signature

err = tx.StoreSignedLogRoot(ctx, newLogRoot)
err = tx.StoreSignedLogRoot(newLogRoot)

if err != nil {
glog.Warningf("%s: failed to write updated tree root: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return 0, err
}

// The batch is now fully sequenced and we're done
if err := tx.Commit(ctx); err != nil {
if err := tx.Commit(); err != nil {
return 0, err
}

Expand All @@ -313,11 +313,11 @@ func (s Sequencer) SignRoot(ctx context.Context) error {
}

// Get the latest known root from storage
currentRoot, err := tx.LatestSignedLogRoot(ctx)
currentRoot, err := tx.LatestSignedLogRoot()

if err != nil {
glog.Warningf("%s: signer failed to get latest root: %s", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return err
}

Expand All @@ -326,7 +326,7 @@ func (s Sequencer) SignRoot(ctx context.Context) error {
merkleTree, err := s.initMerkleTreeFromStorage(ctx, currentRoot, tx)

if err != nil {
tx.Rollback(ctx)
tx.Rollback()
return err
}

Expand All @@ -344,19 +344,19 @@ func (s Sequencer) SignRoot(ctx context.Context) error {

if err != nil {
glog.Warningf("%s: signer failed to sign root: %v", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return err
}

newLogRoot.Signature = &signature

// Store the new root and we're done
if err := tx.StoreSignedLogRoot(ctx, newLogRoot); err != nil {
if err := tx.StoreSignedLogRoot(newLogRoot); err != nil {
glog.Warningf("%s: signer failed to write updated root: %v", util.LogIDPrefix(ctx), err)
tx.Rollback(ctx)
tx.Rollback()
return err
}
glog.V(2).Infof("%s: new signed root, size %d, tree-revision %d", util.LogIDPrefix(ctx), newLogRoot.TreeSize, newLogRoot.TreeRevision)

return tx.Commit(ctx)
return tx.Commit()
}
20 changes: 10 additions & 10 deletions log/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,42 +174,42 @@ func createTestContext(ctrl *gomock.Controller, params testParameters) (testCont

if params.shouldCommit {
if !params.commitFails {
mockTx.EXPECT().Commit(gomock.Any()).AnyTimes().Return(nil)
mockTx.EXPECT().Commit().AnyTimes().Return(nil)
} else {
mockTx.EXPECT().Commit(gomock.Any()).AnyTimes().Return(params.commitError)
mockTx.EXPECT().Commit().AnyTimes().Return(params.commitError)
}
}

if params.shouldRollback {
mockTx.EXPECT().Rollback(gomock.Any()).AnyTimes().Return(nil)
mockTx.EXPECT().Rollback().AnyTimes().Return(nil)
}

if !params.skipDequeue {
if params.overrideDequeueTime != nil {
mockTx.EXPECT().DequeueLeaves(gomock.Any(), params.dequeueLimit, *params.overrideDequeueTime).AnyTimes().Return(params.dequeuedLeaves, params.dequeuedError)
mockTx.EXPECT().DequeueLeaves(params.dequeueLimit, *params.overrideDequeueTime).AnyTimes().Return(params.dequeuedLeaves, params.dequeuedError)
} else {
mockTx.EXPECT().DequeueLeaves(gomock.Any(), params.dequeueLimit, fakeTimeForTest).AnyTimes().Return(params.dequeuedLeaves, params.dequeuedError)
mockTx.EXPECT().DequeueLeaves(params.dequeueLimit, fakeTimeForTest).AnyTimes().Return(params.dequeuedLeaves, params.dequeuedError)
}
}

if params.latestSignedRoot != nil {
mockTx.EXPECT().LatestSignedLogRoot(gomock.Any()).AnyTimes().Return(*params.latestSignedRoot, params.latestSignedRootError)
mockTx.EXPECT().LatestSignedLogRoot().AnyTimes().Return(*params.latestSignedRoot, params.latestSignedRootError)
}

if params.updatedLeaves != nil {
mockTx.EXPECT().UpdateSequencedLeaves(gomock.Any(), *params.updatedLeaves).AnyTimes().Return(params.updatedLeavesError)
mockTx.EXPECT().UpdateSequencedLeaves(*params.updatedLeaves).AnyTimes().Return(params.updatedLeavesError)
}

if params.merkleNodesSet != nil {
mockTx.EXPECT().SetMerkleNodes(gomock.Any(), testonly.NodeSet(*params.merkleNodesSet)).AnyTimes().Return(params.merkleNodesSetError)
mockTx.EXPECT().SetMerkleNodes(testonly.NodeSet(*params.merkleNodesSet)).AnyTimes().Return(params.merkleNodesSetError)
}

if !params.skipStoreSignedRoot {
if params.storeSignedRoot != nil {
mockTx.EXPECT().StoreSignedLogRoot(gomock.Any(), *params.storeSignedRoot).AnyTimes().Return(params.storeSignedRootError)
mockTx.EXPECT().StoreSignedLogRoot(*params.storeSignedRoot).AnyTimes().Return(params.storeSignedRootError)
} else {
// At the moment if we're going to fail the operation we accept any root
mockTx.EXPECT().StoreSignedLogRoot(gomock.Any(), gomock.Any()).AnyTimes().Return(params.storeSignedRootError)
mockTx.EXPECT().StoreSignedLogRoot(gomock.Any()).AnyTimes().Return(params.storeSignedRootError)
}
}

Expand Down
41 changes: 19 additions & 22 deletions merkle/sparse_merkle_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package merkle

import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -65,12 +64,12 @@ type rootHashOrError struct {
// and dropped in.
type Subtree interface {
// SetLeaf sets a single leaf hash for integration into a sparse Merkle tree.
SetLeaf(ctx context.Context, index []byte, hash []byte) error
SetLeaf(index []byte, hash []byte) error

// CalculateRoot instructs the subtree worker to start calculating the root
// hash of its tree. It is an error to call SetLeaf() after calling this
// method.
CalculateRoot(ctx context.Context)
CalculateRoot()

// RootHash returns the calculated root hash for this subtree, if the root
// hash has not yet been calculated, this method will block until it is.
Expand Down Expand Up @@ -150,7 +149,7 @@ func (s *subtreeWriter) getOrCreateChildSubtree(childPrefix []byte) (Subtree, er

// SetLeaf sets a single leaf hash for incorporation into the sparse Merkle
// tree.
func (s *subtreeWriter) SetLeaf(ctx context.Context, index []byte, hash []byte) error {
func (s *subtreeWriter) SetLeaf(index []byte, hash []byte) error {
indexLen := len(index) * 8

switch {
Expand All @@ -164,7 +163,7 @@ func (s *subtreeWriter) SetLeaf(ctx context.Context, index []byte, hash []byte)
return err
}

return subtree.SetLeaf(ctx, index[s.subtreeDepth/8:], hash)
return subtree.SetLeaf(index[s.subtreeDepth/8:], hash)

case indexLen == s.subtreeDepth:
s.leafQueue <- func() (*indexAndHash, error) { return &indexAndHash{index: index, hash: hash}, nil }
Expand All @@ -176,11 +175,11 @@ func (s *subtreeWriter) SetLeaf(ctx context.Context, index []byte, hash []byte)

// CalculateRoot initiates the process of calculating the subtree root.
// The leafQueue is closed.
func (s *subtreeWriter) CalculateRoot(ctx context.Context) {
func (s *subtreeWriter) CalculateRoot() {
close(s.leafQueue)

for _, v := range s.children {
v.CalculateRoot(ctx)
v.CalculateRoot()
}
}

Expand Down Expand Up @@ -210,7 +209,7 @@ func nodeIDFromAddress(size int, prefix []byte, index *big.Int, depth int) stora
// buildSubtree is the worker function which calculates the root hash.
// The root chan will have had exactly one entry placed in it, and have been
// subsequently closed when this method exits.
func (s *subtreeWriter) buildSubtree(ctx context.Context) {
func (s *subtreeWriter) buildSubtree() {
defer close(s.root)

leaves := make([]HStar2LeafHash, 0, len(s.leafQueue))
Expand Down Expand Up @@ -239,7 +238,7 @@ func (s *subtreeWriter) buildSubtree(ctx context.Context) {
root, err := hs2.HStar2Nodes(s.subtreeDepth, treeDepthOffset, leaves,
func(depth int, index *big.Int) ([]byte, error) {
nodeID := nodeIDFromAddress(addressSize, s.prefix, index, depth)
nodes, err := s.tx.GetMerkleNodes(ctx, s.treeRevision, []storage.NodeID{nodeID})
nodes, err := s.tx.GetMerkleNodes(s.treeRevision, []storage.NodeID{nodeID})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,11 +274,11 @@ func (s *subtreeWriter) buildSubtree(ctx context.Context) {
}

// write nodes back to storage
if err := s.tx.SetMerkleNodes(ctx, nodesToStore); err != nil {
if err := s.tx.SetMerkleNodes(nodesToStore); err != nil {
s.root <- rootHashOrError{nil, err}
return
}
if err := s.tx.Commit(ctx); err != nil {
if err := s.tx.Commit(); err != nil {
s.root <- rootHashOrError{nil, err}
return
}
Expand Down Expand Up @@ -335,12 +334,10 @@ func newLocalSubtreeWriter(rev int64, prefix []byte, depths []int, newTX newTXFu
return newLocalSubtreeWriter(rev, myPrefix, depths[1:], newTX, h)
},
}
// TODO(al): figure out how to get a real context used.
ctx := context.TODO()

// TODO(al): probably shouldn't be spawning go routines willy-nilly like
// this, but it'll do for now.
go tree.buildSubtree(ctx)
go tree.buildSubtree()
return &tree, nil
}

Expand All @@ -363,9 +360,9 @@ func NewSparseMerkleTreeWriter(rev int64, h MapHasher, newTX newTXFunc) (*Sparse

// RootAtRevision returns the sparse Merkle tree root hash at the specified
// revision, or ErrNoSuchRevision if the requested revision doesn't exist.
func (s SparseMerkleTreeReader) RootAtRevision(ctx context.Context, rev int64) ([]byte, error) {
func (s SparseMerkleTreeReader) RootAtRevision(rev int64) ([]byte, error) {
rootNodeID := storage.NewEmptyNodeID(256)
nodes, err := s.tx.GetMerkleNodes(ctx, rev, []storage.NodeID{rootNodeID})
nodes, err := s.tx.GetMerkleNodes(rev, []storage.NodeID{rootNodeID})
if err != nil {
return nil, err
}
Expand All @@ -389,11 +386,11 @@ func (s SparseMerkleTreeReader) RootAtRevision(ctx context.Context, rev int64) (
// InclusionProof returns an inclusion (or non-inclusion) proof for the
// specified key at the specified revision.
// If the revision does not exist it will return ErrNoSuchRevision error.
func (s SparseMerkleTreeReader) InclusionProof(ctx context.Context, rev int64, key []byte) ([][]byte, error) {
func (s SparseMerkleTreeReader) InclusionProof(rev int64, key []byte) ([][]byte, error) {
kh := s.hasher.HashKey(key)
nid := storage.NewNodeIDFromHash(kh)
sibs := nid.Siblings()
nodes, err := s.tx.GetMerkleNodes(ctx, rev, sibs)
nodes, err := s.tx.GetMerkleNodes(rev, sibs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -428,18 +425,18 @@ func (s SparseMerkleTreeReader) InclusionProof(ctx context.Context, rev int64, k
}

// SetLeaves adds a batch of leaves to the in-flight tree update.
func (s *SparseMerkleTreeWriter) SetLeaves(ctx context.Context, leaves []HashKeyValue) error {
func (s *SparseMerkleTreeWriter) SetLeaves(leaves []HashKeyValue) error {
for _, l := range leaves {
if err := s.tree.SetLeaf(ctx, l.HashedKey, l.HashedValue); err != nil {
if err := s.tree.SetLeaf(l.HashedKey, l.HashedValue); err != nil {
return err
}
}
return nil
}

// CalculateRoot calculates the new root hash including the newly added leaves.
func (s *SparseMerkleTreeWriter) CalculateRoot(ctx context.Context) ([]byte, error) {
s.tree.CalculateRoot(ctx)
func (s *SparseMerkleTreeWriter) CalculateRoot() ([]byte, error) {
s.tree.CalculateRoot()
return s.tree.RootHash()
}

Expand Down
Loading

0 comments on commit ff76ca6

Please sign in to comment.