Skip to content

Commit

Permalink
MAP Experiment: Allow map update in a single transaction (#1272)
Browse files Browse the repository at this point in the history
DANGER: Highly experimental!
This allows map updates to be performed in a single transaction.
In general, this is a terrible idea but it is useful for experimentation and correctness tests.

A quick run against MySQL shows that this results in a large decrease in performance in the integration map test, with the time taken to complete the test rising from ~9s (across multiple transactions) to 15s when rolled into a single transaction.
  • Loading branch information
AlCutter authored Sep 6, 2018
1 parent ee257a5 commit c53ded5
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 17 deletions.
1 change: 1 addition & 0 deletions integration/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ map_prep_test() {
./trillian_map_server \
--rpc_endpoint="localhost:${port}" \
--http_endpoint="localhost:${http}" \
--single_transaction=true \
--alsologtostderr \
&
pid=$!
Expand Down
7 changes: 5 additions & 2 deletions integration/maptest/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
_ "github.com/google/trillian/merkle/maphasher"
)

var server = flag.String("map_rpc_server", "", "Server address:port")
var (
server = flag.String("map_rpc_server", "", "Server address:port")
singleTX = flag.Bool("single_transaction", false, "Experimental: whether to update the map in a single transaction")
)

func TestMapIntegration(t *testing.T) {

Expand All @@ -38,7 +41,7 @@ func TestMapIntegration(t *testing.T) {
if !testdb.MySQLAvailable() {
t.Skip("Skipping map integration test, MySQL not available")
}
env, err = integration.NewMapEnv(ctx)
env, err = integration.NewMapEnv(ctx, *singleTX)
} else {
env, err = integration.NewMapEnvFromConn(*server)
}
Expand Down
28 changes: 26 additions & 2 deletions server/map_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/google/trillian"
Expand Down Expand Up @@ -46,14 +47,26 @@ var (
// TODO(codingllama): There is no access control in the server yet and clients could easily modify
// any tree.

// TrillianMapServerOptions allows various options to be provided when creating
// a new TrillianMapServer.
type TrillianMapServerOptions struct {
// UseSingleTransaction specifies whether updates to a map should be
// attempted within a single transaction.
UseSingleTransaction bool
}

// TrillianMapServer implements the RPC API defined in the proto
type TrillianMapServer struct {
registry extension.Registry
opts TrillianMapServerOptions
}

// NewTrillianMapServer creates a new RPC server backed by registry
func NewTrillianMapServer(registry extension.Registry) *TrillianMapServer {
return &TrillianMapServer{registry}
func NewTrillianMapServer(registry extension.Registry, opts TrillianMapServerOptions) *TrillianMapServer {
if opts.UseSingleTransaction {
glog.Warning("Using experimental single-transaction mode for map server.")
}
return &TrillianMapServer{registry: registry, opts: opts}
}

// IsHealthy returns nil if the server is healthy, error otherwise.
Expand Down Expand Up @@ -205,6 +218,8 @@ func (t *TrillianMapServer) SetLeaves(ctx context.Context, req *trillian.SetMapL
}
ctx = trees.NewContext(ctx, tree)

txRolledUp := uint64(0)

var newRoot *trillian.SignedMapRoot
err = t.registry.MapStorage.ReadWriteTransaction(ctx, tree, func(ctx context.Context, tx storage.MapTreeTX) error {
writeRev, err := tx.WriteRevision(ctx)
Expand All @@ -217,6 +232,11 @@ func (t *TrillianMapServer) SetLeaves(ctx context.Context, req *trillian.SetMapL
req.MapId,
writeRev,
hasher, func(ctx context.Context, f func(context.Context, storage.MapTreeTX) error) error {
if t.opts.UseSingleTransaction {
glog.V(1).Infof("Using enclosing tx for subtree operation %d", atomic.LoadUint64(&txRolledUp))
atomic.AddUint64(&txRolledUp, 1)
return f(ctx, tx)
}
return t.registry.MapStorage.ReadWriteTransaction(ctx, tree, f)
})
if err != nil {
Expand Down Expand Up @@ -252,6 +272,10 @@ func (t *TrillianMapServer) SetLeaves(ctx context.Context, req *trillian.SetMapL
}
}

if t.opts.UseSingleTransaction {
glog.V(1).Infof("Rolled %d transactions up into single commit", atomic.LoadUint64(&txRolledUp))
}

rootHash, err := smtWriter.CalculateRoot()
if err != nil {
return fmt.Errorf("CalculateRoot(): %v", err)
Expand Down
14 changes: 8 additions & 6 deletions server/map_rpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ func TestIsHealthy(t *testing.T) {
{"unhealthy", errors.New("DB not happy")},
}

opts := TrillianMapServerOptions{}

for _, test := range tests {
fakeStorage := storage.NewMockMapStorage(ctrl)
fakeStorage.EXPECT().CheckDatabaseAccessible(gomock.Any()).Return(test.accessibleErr)

server := NewTrillianMapServer(extension.Registry{
AdminStorage: fakeAdminStorageForMap(ctrl, 1, mapID1),
MapStorage: fakeStorage,
})
}, opts)

wantErr := test.accessibleErr != nil
err := server.IsHealthy()
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestInitMap(t *testing.T) {
server := NewTrillianMapServer(extension.Registry{
AdminStorage: fakeAdminStorageForMap(ctrl, 2, mapID1),
MapStorage: fakeStorage,
})
}, TrillianMapServerOptions{})

c, err := server.InitMap(ctx, &trillian.InitMapRequest{
MapId: mapID1,
Expand Down Expand Up @@ -138,7 +140,7 @@ func TestGetSignedMapRoot_NotInitialised(t *testing.T) {
server := NewTrillianMapServer(extension.Registry{
MapStorage: fakeStorage,
AdminStorage: fakeAdmin,
})
}, TrillianMapServerOptions{})
fakeStorage.EXPECT().SnapshotForTree(gomock.Any(), gomock.Any()).Return(mockTX, nil)
mockTX.EXPECT().LatestSignedMapRoot(gomock.Any()).Return(trillian.SignedMapRoot{}, storage.ErrTreeNeedsInit)
mockTX.EXPECT().Close()
Expand Down Expand Up @@ -206,7 +208,7 @@ func TestGetSignedMapRoot(t *testing.T) {
server := NewTrillianMapServer(extension.Registry{
AdminStorage: adminStorage,
MapStorage: fakeStorage,
})
}, TrillianMapServerOptions{})

smrResp, err := server.GetSignedMapRoot(ctx, test.req)

Expand Down Expand Up @@ -237,7 +239,7 @@ func TestGetSignedMapRootByRevision_NotInitialised(t *testing.T) {
server := NewTrillianMapServer(extension.Registry{
MapStorage: fakeStorage,
AdminStorage: adminStorage,
})
}, TrillianMapServerOptions{})
fakeStorage.EXPECT().SnapshotForTree(gomock.Any(), gomock.Any()).Return(mockTX, nil)
mockTX.EXPECT().GetSignedMapRoot(gomock.Any(), gomock.Any()).Return(trillian.SignedMapRoot{}, storage.ErrTreeNeedsInit)
mockTX.EXPECT().Close()
Expand Down Expand Up @@ -315,7 +317,7 @@ func TestGetSignedMapRootByRevision(t *testing.T) {
server := NewTrillianMapServer(extension.Registry{
AdminStorage: adminStorage,
MapStorage: fakeStorage,
})
}, TrillianMapServerOptions{})

smrResp, err := server.GetSignedMapRootByRevision(ctx, test.req)

Expand Down
7 changes: 6 additions & 1 deletion server/trillian_map_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
tracingPercent = flag.Int("tracing_percent", 0, "Percent of requests to be traced. Zero is a special case to use the DefaultSampler")

configFile = flag.String("config", "", "Config file containing flags, file contents can be overridden by command line flags")

useSingleTransaction = flag.Bool("single_transaction", false, "Experimental: use a single transaction when updating the map")
)

func main() {
Expand Down Expand Up @@ -132,7 +134,10 @@ func main() {
return nil
},
RegisterServerFn: func(s *grpc.Server, registry extension.Registry) error {
mapServer := server.NewTrillianMapServer(registry)
mapServer := server.NewTrillianMapServer(registry,
server.TrillianMapServerOptions{
UseSingleTransaction: *useSingleTransaction,
})
if err := mapServer.IsHealthy(); err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions testonly/hammer/hammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ import (
_ "github.com/google/trillian/merkle/maphasher" // register TEST_MAP_HASHER
)

var operations = flag.Uint64("operations", 20, "Number of operations to perform")
var (
operations = flag.Uint64("operations", 20, "Number of operations to perform")
singleTX = flag.Bool("single_transaction", false, "Experimental: whether to use a single transaction when updating the map")
)

func TestInProcessMapHammer(t *testing.T) {
testdb.SkipIfNoMySQL(t)
ctx := context.Background()
env, err := integration.NewMapEnv(ctx)
env, err := integration.NewMapEnv(ctx, *singleTX)
if err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 6 additions & 4 deletions testonly/integration/mapenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewMapEnvFromConn(addr string) (*MapEnv, error) {
}

// NewMapEnv creates a fresh DB, map server, and client.
func NewMapEnv(ctx context.Context) (*MapEnv, error) {
func NewMapEnv(ctx context.Context, singleTX bool) (*MapEnv, error) {
if !testdb.MySQLAvailable() {
return nil, errors.New("no MySQL available")
}
Expand All @@ -87,7 +87,7 @@ func NewMapEnv(ctx context.Context) (*MapEnv, error) {
},
}

ret, err := NewMapEnvWithRegistry(registry)
ret, err := NewMapEnvWithRegistry(registry, singleTX)
if err != nil {
db.Close()
return nil, err
Expand All @@ -98,7 +98,9 @@ func NewMapEnv(ctx context.Context) (*MapEnv, error) {

// NewMapEnvWithRegistry uses the passed in Registry to create a map server and
// client.
func NewMapEnvWithRegistry(registry extension.Registry) (*MapEnv, error) {
// If singleTX is set, the map will attempt to use a single transaction when
// updating the map data.
func NewMapEnvWithRegistry(registry extension.Registry, singleTX bool) (*MapEnv, error) {
addr, lis, err := listen()
if err != nil {
return nil, err
Expand All @@ -113,7 +115,7 @@ func NewMapEnvWithRegistry(registry extension.Registry) (*MapEnv, error) {
ti.UnaryInterceptor,
)),
)
mapServer := server.NewTrillianMapServer(registry)
mapServer := server.NewTrillianMapServer(registry, server.TrillianMapServerOptions{UseSingleTransaction: singleTX})
trillian.RegisterTrillianMapServer(grpcServer, mapServer)
trillian.RegisterTrillianAdminServer(grpcServer, admin.New(registry, nil /* allowedTreeTypes */))
go grpcServer.Serve(lis)
Expand Down

0 comments on commit c53ded5

Please sign in to comment.