-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce a BeginTransaction request. #2799
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -408,27 +408,43 @@ func (txn *Txn) send(reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb. | |
return &roachpb.BatchResponse{}, nil | ||
} | ||
|
||
lastReq := reqs[lastIndex] | ||
// haveTxnWrite tracks intention to write. This is in contrast to | ||
// txn.Proto.Writing, which is set by the coordinator when the first | ||
// intent has been created, and which lives for the life of the | ||
// transaction. | ||
haveTxnWrite := roachpb.IsTransactionWrite(lastReq) | ||
|
||
for _, args := range reqs[:lastIndex] { | ||
if _, ok := args.(*roachpb.EndTransactionRequest); ok { | ||
return nil, roachpb.NewError(util.Errorf("%s sent as non-terminal call", args.Method())) | ||
// firstWriteIndex is set to the index of the first command which is | ||
// a transactional write. If != -1, this indicates an intention to | ||
// write. This is in contrast to txn.Proto.Writing, which is set by | ||
// the coordinator when the first intent has been created, and which | ||
// lives for the life of the transaction. | ||
firstWriteIndex := -1 | ||
var firstWriteKey roachpb.Key | ||
|
||
for i, args := range reqs { | ||
if i < lastIndex { | ||
if _, ok := args.(*roachpb.EndTransactionRequest); ok { | ||
return nil, roachpb.NewError(util.Errorf("%s sent as non-terminal call", args.Method())) | ||
} | ||
} | ||
|
||
if !haveTxnWrite { | ||
haveTxnWrite = roachpb.IsTransactionWrite(args) | ||
if roachpb.IsTransactionWrite(args) && firstWriteIndex == -1 { | ||
firstWriteKey = args.Header().Key | ||
firstWriteIndex = i | ||
} | ||
} | ||
|
||
endTxnRequest, haveEndTxn := lastReq.(*roachpb.EndTransactionRequest) | ||
haveTxnWrite := firstWriteIndex != -1 | ||
endTxnRequest, haveEndTxn := reqs[lastIndex].(*roachpb.EndTransactionRequest) | ||
needBeginTxn := !txn.Proto.Writing && haveTxnWrite | ||
needEndTxn := txn.Proto.Writing || haveTxnWrite | ||
elideEndTxn := haveEndTxn && !needEndTxn | ||
|
||
// If we're not yet writing in this txn, but intend to, insert a | ||
// begin transaction request before the first write command. | ||
if needBeginTxn { | ||
bt := &roachpb.BeginTransactionRequest{ | ||
Span: roachpb.Span{ | ||
Key: firstWriteKey, | ||
}, | ||
} | ||
reqs = append(append(append([]roachpb.Request(nil), reqs[:firstWriteIndex]...), []roachpb.Request{bt}...), reqs[firstWriteIndex:]...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Changed. |
||
} | ||
|
||
if elideEndTxn { | ||
reqs = reqs[:lastIndex] | ||
} | ||
|
@@ -445,5 +461,26 @@ func (txn *Txn) send(reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb. | |
txn.Proto.Status = roachpb.ABORTED | ||
} | ||
} | ||
|
||
// If we inserted a begin transaction request, remove it here. | ||
if needBeginTxn { | ||
if br != nil && br.Responses != nil { | ||
br.Responses = append(br.Responses[:firstWriteIndex], br.Responses[firstWriteIndex+1:]...) | ||
} | ||
// Handle case where inserted begin txn confused an indexed error. | ||
if pErr != nil && pErr.Detail != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. |
||
if iErr, ok := pErr.Detail.GetValue().(roachpb.IndexedError); ok { | ||
if idx, ok := iErr.ErrorIndex(); ok { | ||
if idx == int32(firstWriteIndex) { | ||
// An error was encountered on begin txn; disallow the indexing. | ||
pErr = roachpb.NewError(util.Errorf("error on begin transaction: %s", pErr.Detail.GetValue().(error))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the type assertion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should just be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for printing, can actually just print There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm just formatting with pErr. Didn't want to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why would that give an index? do any of the errors' Error() methods output the index? |
||
} else if idx > int32(firstWriteIndex) { | ||
// An error was encountered after begin txn; decrement index. | ||
iErr.SetErrorIndex(idx - 1) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return br, pErr | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -295,7 +295,9 @@ func (tc *TxnCoordSender) startStats() { | |
// write intents; they're tagged to an outgoing EndTransaction request, with | ||
// the receiving replica in charge of resolving them. | ||
func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { | ||
tc.maybeBeginTxn(&ba) | ||
if err := tc.maybeBeginTxn(&ba); err != nil { | ||
return nil, roachpb.NewError(err) | ||
} | ||
ba.CmdID = ba.GetOrCreateCmdID(tc.clock.PhysicalNow()) | ||
var startNS int64 | ||
|
||
|
@@ -333,6 +335,10 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r | |
|
||
if rArgs, ok := ba.GetArg(roachpb.EndTransaction); ok { | ||
et := rArgs.(*roachpb.EndTransactionRequest) | ||
if len(et.Key) != 0 { | ||
return nil, roachpb.NewError(util.Errorf("EndTransaction must not have a Key set")) | ||
} | ||
et.Key = ba.Txn.Key | ||
// Remember when EndTransaction started in case we want to | ||
// be linearizable. | ||
startNS = tc.clock.PhysicalNow() | ||
|
@@ -342,11 +348,6 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r | |
// write on multiple coordinators. | ||
return nil, roachpb.NewError(util.Errorf("client must not pass intents to EndTransaction")) | ||
} | ||
if len(et.Key) != 0 { | ||
return nil, roachpb.NewError(util.Errorf("EndTransaction must not have a Key set")) | ||
} | ||
et.Key = ba.Txn.Key | ||
|
||
tc.Lock() | ||
txnMeta, metaOK := tc.txns[id] | ||
if id != "" && metaOK { | ||
|
@@ -441,17 +442,21 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r | |
// in the request but has a nil ID. The new transaction is initialized | ||
// using the name and isolation in the otherwise uninitialized txn. | ||
// The Priority, if non-zero is used as a minimum. | ||
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) { | ||
// | ||
// No transactional writes are allowed unless preceded by a begin | ||
// transaction request within the same batch. The exception is if the | ||
// transaction is already in state txn.Writing=true. | ||
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) error { | ||
if ba.Txn == nil { | ||
return | ||
return nil | ||
} | ||
if len(ba.Requests) == 0 { | ||
panic("empty batch with txn") | ||
return util.Errorf("empty batch with txn") | ||
} | ||
if len(ba.Txn.ID) == 0 { | ||
// TODO(tschottdorf): should really choose the first txn write here. | ||
firstKey := ba.Requests[0].GetInner().Header().Key | ||
newTxn := roachpb.NewTransaction(ba.Txn.Name, firstKey, ba.GetUserPriority(), | ||
// Create transaction without a key. The key is set when a begin | ||
// transaction request is received. | ||
newTxn := roachpb.NewTransaction(ba.Txn.Name, nil, ba.GetUserPriority(), | ||
ba.Txn.Isolation, tc.clock.Now(), tc.clock.MaxOffset().Nanoseconds()) | ||
// Use existing priority as a minimum. This is used on transaction | ||
// aborts to ratchet priority when creating successor transaction. | ||
|
@@ -460,6 +465,25 @@ func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) { | |
} | ||
ba.Txn = newTxn | ||
} | ||
|
||
// Check for a begin transaction to set txn key based on the key of | ||
// the first transactional write. Also enforce that no transactional | ||
// writes occur before a begin transaction. | ||
var haveBeginTxn bool | ||
for _, req := range ba.Requests { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take a look at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to go through them all in order to check for error conditions. |
||
args := req.GetInner() | ||
if bt, ok := args.(*roachpb.BeginTransactionRequest); ok { | ||
if haveBeginTxn || ba.Txn.Writing { | ||
return util.Errorf("begin transaction requested twice in the same transaction") | ||
} | ||
haveBeginTxn = true | ||
ba.Txn.Key = bt.Key | ||
} | ||
if roachpb.IsTransactionWrite(args) && !haveBeginTxn && !ba.Txn.Writing { | ||
return util.Errorf("transactional write before begin transaction") | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// cleanupTxn is called when a transaction ends. The transaction record is | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this end up being the last write key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err, disregard