Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a BeginTransaction request. #2799

Merged
merged 2 commits into from
Oct 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (b *Batch) fillResults(br *roachpb.BatchResponse, pErr *roachpb.Error) erro
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
case *roachpb.BeginTransactionRequest:
case *roachpb.EndTransactionRequest:
case *roachpb.AdminMergeRequest:
case *roachpb.AdminSplitRequest:
Expand Down
65 changes: 51 additions & 14 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,27 +408,43 @@ func (txn *Txn) send(reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb.
return &roachpb.BatchResponse{}, nil
}

lastReq := reqs[lastIndex]
// haveTxnWrite tracks intention to write. This is in contrast to
// txn.Proto.Writing, which is set by the coordinator when the first
// intent has been created, and which lives for the life of the
// transaction.
haveTxnWrite := roachpb.IsTransactionWrite(lastReq)

for _, args := range reqs[:lastIndex] {
if _, ok := args.(*roachpb.EndTransactionRequest); ok {
return nil, roachpb.NewError(util.Errorf("%s sent as non-terminal call", args.Method()))
// firstWriteIndex is set to the index of the first command which is
// a transactional write. If != -1, this indicates an intention to
// write. This is in contrast to txn.Proto.Writing, which is set by
// the coordinator when the first intent has been created, and which
// lives for the life of the transaction.
firstWriteIndex := -1
var firstWriteKey roachpb.Key

for i, args := range reqs {
if i < lastIndex {
if _, ok := args.(*roachpb.EndTransactionRequest); ok {
return nil, roachpb.NewError(util.Errorf("%s sent as non-terminal call", args.Method()))
}
}

if !haveTxnWrite {
haveTxnWrite = roachpb.IsTransactionWrite(args)
if roachpb.IsTransactionWrite(args) && firstWriteIndex == -1 {
firstWriteKey = args.Header().Key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this end up being the last write key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err, disregard

firstWriteIndex = i
}
}

endTxnRequest, haveEndTxn := lastReq.(*roachpb.EndTransactionRequest)
haveTxnWrite := firstWriteIndex != -1
endTxnRequest, haveEndTxn := reqs[lastIndex].(*roachpb.EndTransactionRequest)
needBeginTxn := !txn.Proto.Writing && haveTxnWrite
needEndTxn := txn.Proto.Writing || haveTxnWrite
elideEndTxn := haveEndTxn && !needEndTxn

// If we're not yet writing in this txn, but intend to, insert a
// begin transaction request before the first write command.
if needBeginTxn {
bt := &roachpb.BeginTransactionRequest{
Span: roachpb.Span{
Key: firstWriteKey,
},
}
reqs = append(append(append([]roachpb.Request(nil), reqs[:firstWriteIndex]...), []roachpb.Request{bt}...), reqs[firstWriteIndex:]...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why []roachpb.Request{bt}... instead of just bt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed.

}

if elideEndTxn {
reqs = reqs[:lastIndex]
}
Expand All @@ -445,5 +461,26 @@ func (txn *Txn) send(reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb.
txn.Proto.Status = roachpb.ABORTED
}
}

// If we inserted a begin transaction request, remove it here.
if needBeginTxn {
if br != nil && br.Responses != nil {
br.Responses = append(br.Responses[:firstWriteIndex], br.Responses[firstWriteIndex+1:]...)
}
// Handle case where inserted begin txn confused an indexed error.
if pErr != nil && pErr.Detail != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just if iErr, ok := pErr.GoError().(roachpb.IndexedError) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

if iErr, ok := pErr.Detail.GetValue().(roachpb.IndexedError); ok {
if idx, ok := iErr.ErrorIndex(); ok {
if idx == int32(firstWriteIndex) {
// An error was encountered on begin txn; disallow the indexing.
pErr = roachpb.NewError(util.Errorf("error on begin transaction: %s", pErr.Detail.GetValue().(error)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the type assertion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should just be pErr.GoError().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for printing, can actually just print pErr directly, it gives the same output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just formatting with pErr. Didn't want to use pErr.GoError() because that would give an index, which would confuse callers, as BeginTransaction is added to the batch and then removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would that give an index? do any of the errors' Error() methods output the index?

} else if idx > int32(firstWriteIndex) {
// An error was encountered after begin txn; decrement index.
iErr.SetErrorIndex(idx - 1)
}
}
}
}
}
return br, pErr
}
66 changes: 60 additions & 6 deletions client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func newDB(sender Sender) *DB {
}
}

// TestSender mocks out some of the txn coordinator sender's
// functionality. It responds to PutRequests using testPutResp.
func newTestSender(pre, post func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)) SenderFunc {
txnKey := roachpb.Key("test-txn")
txnID := []byte(uuid.NewUUID4())
Expand All @@ -68,9 +70,16 @@ func newTestSender(pre, post func(roachpb.BatchRequest) (*roachpb.BatchResponse,
}
var writing bool
status := roachpb.PENDING
if _, ok := ba.GetArg(roachpb.Put); ok {
br.Add(proto.Clone(testPutResp).(roachpb.Response))
writing = true
for i, req := range ba.Requests {
args := req.GetInner()
if _, ok := args.(*roachpb.PutRequest); ok {
if !br.Responses[i].SetValue(proto.Clone(testPutResp).(roachpb.Response)) {
panic("failed to set put response")
}
}
if roachpb.IsTransactionWrite(args) {
writing = true
}
}
if args, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := args.(*roachpb.EndTransactionRequest)
Expand Down Expand Up @@ -240,20 +249,65 @@ func TestCommitReadOnlyTransactionExplicit(t *testing.T) {
// upon successful invocation of the retryable func.
func TestCommitMutatingTransaction(t *testing.T) {
defer leaktest.AfterTest(t)

var calls []roachpb.Method
db := newDB(newTestSender(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if bt, ok := ba.GetArg(roachpb.BeginTransaction); ok && !bt.Header().Key.Equal(roachpb.Key("a")) {
t.Errorf("expected begin transaction key to be \"a\"; got %s", bt.Header().Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok && !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
}
return ba.CreateReply(), nil
}, nil))

// Test all transactional write methods.
testArgs := []struct {
f func(txn *Txn) error
expMethod roachpb.Method
}{
{func(txn *Txn) error { return txn.Put("a", "b") }, roachpb.Put},
{func(txn *Txn) error { return txn.CPut("a", "b", nil) }, roachpb.ConditionalPut},
{func(txn *Txn) error {
_, err := txn.Inc("a", 1)
return err
}, roachpb.Increment},
{func(txn *Txn) error { return txn.Del("a") }, roachpb.Delete},
{func(txn *Txn) error { return txn.DelRange("a", "b") }, roachpb.DeleteRange},
}
for i, test := range testArgs {
calls = []roachpb.Method{}
if err := db.Txn(func(txn *Txn) error {
return test.f(txn)
}); err != nil {
t.Errorf("%d: unexpected error on commit: %s", i, err)
}
expectedCalls := []roachpb.Method{roachpb.BeginTransaction, test.expMethod, roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Errorf("%d: expected %s, got %s", i, expectedCalls, calls)
}
}
}

// TestTxnInsertBeginTransaction verifies that a begin transaction
// request is inserted just before the first mutating command.
func TestTxnInsertBeginTransaction(t *testing.T) {
defer leaktest.AfterTest(t)
var calls []roachpb.Method
db := newDB(newTestSender(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
return ba.CreateReply(), nil
}, nil))
if err := db.Txn(func(txn *Txn) error {
if _, err := txn.Get("foo"); err != nil {
return err
}
return txn.Put("a", "b")
}); err != nil {
t.Errorf("unexpected error on commit: %s", err)
}
expectedCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}
expectedCalls := []roachpb.Method{roachpb.Get, roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Errorf("expected %s, got %s", expectedCalls, calls)
}
Expand Down Expand Up @@ -306,7 +360,7 @@ func TestAbortReadOnlyTransaction(t *testing.T) {
func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
defer leaktest.AfterTest(t)
for _, success := range []bool{true, false} {
expCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}
expCalls := []roachpb.Method{roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
var calls []roachpb.Method
db := newDB(newTestSender(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
Expand Down Expand Up @@ -355,7 +409,7 @@ func TestAbortMutatingTransaction(t *testing.T) {
}); err == nil {
t.Error("expected error on abort")
}
expectedCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}
expectedCalls := []roachpb.Method{roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Errorf("expected %s, got %s", expectedCalls, calls)
}
Expand Down
23 changes: 12 additions & 11 deletions kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ import (
)

var allExternalMethods = [...]roachpb.Request{
roachpb.Get: &roachpb.GetRequest{},
roachpb.Put: &roachpb.PutRequest{},
roachpb.ConditionalPut: &roachpb.ConditionalPutRequest{},
roachpb.Increment: &roachpb.IncrementRequest{},
roachpb.Delete: &roachpb.DeleteRequest{},
roachpb.DeleteRange: &roachpb.DeleteRangeRequest{},
roachpb.Scan: &roachpb.ScanRequest{},
roachpb.ReverseScan: &roachpb.ReverseScanRequest{},
roachpb.EndTransaction: &roachpb.EndTransactionRequest{},
roachpb.AdminSplit: &roachpb.AdminSplitRequest{},
roachpb.AdminMerge: &roachpb.AdminMergeRequest{},
roachpb.Get: &roachpb.GetRequest{},
roachpb.Put: &roachpb.PutRequest{},
roachpb.ConditionalPut: &roachpb.ConditionalPutRequest{},
roachpb.Increment: &roachpb.IncrementRequest{},
roachpb.Delete: &roachpb.DeleteRequest{},
roachpb.DeleteRange: &roachpb.DeleteRangeRequest{},
roachpb.Scan: &roachpb.ScanRequest{},
roachpb.ReverseScan: &roachpb.ReverseScanRequest{},
roachpb.BeginTransaction: &roachpb.BeginTransactionRequest{},
roachpb.EndTransaction: &roachpb.EndTransactionRequest{},
roachpb.AdminSplit: &roachpb.AdminSplitRequest{},
roachpb.AdminMerge: &roachpb.AdminMergeRequest{},
}

// A DBServer provides an HTTP server endpoint serving the key-value API.
Expand Down
48 changes: 36 additions & 12 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ func (tc *TxnCoordSender) startStats() {
// write intents; they're tagged to an outgoing EndTransaction request, with
// the receiving replica in charge of resolving them.
func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
tc.maybeBeginTxn(&ba)
if err := tc.maybeBeginTxn(&ba); err != nil {
return nil, roachpb.NewError(err)
}
ba.CmdID = ba.GetOrCreateCmdID(tc.clock.PhysicalNow())
var startNS int64

Expand Down Expand Up @@ -333,6 +335,10 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r

if rArgs, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := rArgs.(*roachpb.EndTransactionRequest)
if len(et.Key) != 0 {
return nil, roachpb.NewError(util.Errorf("EndTransaction must not have a Key set"))
}
et.Key = ba.Txn.Key
// Remember when EndTransaction started in case we want to
// be linearizable.
startNS = tc.clock.PhysicalNow()
Expand All @@ -342,11 +348,6 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r
// write on multiple coordinators.
return nil, roachpb.NewError(util.Errorf("client must not pass intents to EndTransaction"))
}
if len(et.Key) != 0 {
return nil, roachpb.NewError(util.Errorf("EndTransaction must not have a Key set"))
}
et.Key = ba.Txn.Key

tc.Lock()
txnMeta, metaOK := tc.txns[id]
if id != "" && metaOK {
Expand Down Expand Up @@ -441,17 +442,21 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r
// in the request but has a nil ID. The new transaction is initialized
// using the name and isolation in the otherwise uninitialized txn.
// The Priority, if non-zero is used as a minimum.
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) {
//
// No transactional writes are allowed unless preceded by a begin
// transaction request within the same batch. The exception is if the
// transaction is already in state txn.Writing=true.
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) error {
if ba.Txn == nil {
return
return nil
}
if len(ba.Requests) == 0 {
panic("empty batch with txn")
return util.Errorf("empty batch with txn")
}
if len(ba.Txn.ID) == 0 {
// TODO(tschottdorf): should really choose the first txn write here.
firstKey := ba.Requests[0].GetInner().Header().Key
newTxn := roachpb.NewTransaction(ba.Txn.Name, firstKey, ba.GetUserPriority(),
// Create transaction without a key. The key is set when a begin
// transaction request is received.
newTxn := roachpb.NewTransaction(ba.Txn.Name, nil, ba.GetUserPriority(),
ba.Txn.Isolation, tc.clock.Now(), tc.clock.MaxOffset().Nanoseconds())
// Use existing priority as a minimum. This is used on transaction
// aborts to ratchet priority when creating successor transaction.
Expand All @@ -460,6 +465,25 @@ func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) {
}
ba.Txn = newTxn
}

// Check for a begin transaction to set txn key based on the key of
// the first transactional write. Also enforce that no transactional
// writes occur before a begin transaction.
var haveBeginTxn bool
for _, req := range ba.Requests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take a look at ba.GetArg(proto.BeginTransactionRequest).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to go through them all in order to check for error conditions.

args := req.GetInner()
if bt, ok := args.(*roachpb.BeginTransactionRequest); ok {
if haveBeginTxn || ba.Txn.Writing {
return util.Errorf("begin transaction requested twice in the same transaction")
}
haveBeginTxn = true
ba.Txn.Key = bt.Key
}
if roachpb.IsTransactionWrite(args) && !haveBeginTxn && !ba.Txn.Writing {
return util.Errorf("transactional write before begin transaction")
}
}
return nil
}

// cleanupTxn is called when a transaction ends. The transaction record is
Expand Down
Loading