Skip to content

Commit

Permalink
storage-side batch processing.
Browse files Browse the repository at this point in the history
* feedback on original diff
* rebase fixups
* fix data race
* add missing timestamp on BatchRequest header
  this is cleaned up more in #2141, where only the batch gets
  the headers, but with this fix we avoid one obvious source
  of them.
* 5 instead of 50k intents in TestGCQueueIntentResolution
  ain't nobody got time for that (now).
* do not verifyKeys() if it is a batch
  batch semantics can go either way. Cleaned up more in #2141,
  where keys on the batch header are always a key range (and
  subject to removal, see #2155).
* deal with batch self-overlap in command queue
  even basic tests have this (TestUpdateRangeAddressing by the way)
  and however we deal with this, it should be on Store.
* allow self-overlapping batches
  discussion needed, but if this has a good chance
  of leading to passing tests, I think we should
  go down that route first.
* make RaftCmd.Cmd nullable
* panic on empty batch
* feedback
  • Loading branch information
spencerkimball authored and tbg committed Sep 6, 2015
1 parent 3cfc7d0 commit c42426a
Show file tree
Hide file tree
Showing 23 changed files with 2,182 additions and 10,401 deletions.
41 changes: 33 additions & 8 deletions proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ func IsWrite(args Request) bool {
return (args.flags() & isWrite) != 0
}

// IsReadOnly returns true if the request is read-only.
// IsReadOnly returns true iff the request is read-only.
func IsReadOnly(args Request) bool {
return IsRead(args) && !IsWrite(args)
}

// IsWriteOnly returns true if the request only requires write permissions.
func IsWriteOnly(args Request) bool {
return !IsRead(args) && IsWrite(args)
}

// IsTransactionWrite returns true if the request produces write
// intents when used within a transaction.
func IsTransactionWrite(args Request) bool {
Expand All @@ -85,6 +90,14 @@ func IsRange(args Request) bool {
return (args.flags() & isRange) != 0
}

// CanBatch returns true if the request type can be part of a batch request.
// TODO(tschottdorf): everything should be batchable. Those requests which
// currently are !CanBatch should simply have to be alone in their batch.
// That is easier since only BatchRequest must be supported.
func CanBatch(args Request) bool {
return IsRead(args) || IsWrite(args)
}

// Request is an interface for RPC requests.
type Request interface {
gogoproto.Message
Expand Down Expand Up @@ -257,7 +270,7 @@ func (rh *ResponseHeader) GoError() error {
}

// SetGoError converts the specified type into either one of the proto-
// defined error types or into a Error for all other Go errors.
// defined error types or into an Error for all other Go errors.
func (rh *ResponseHeader) SetGoError(err error) {
if err == nil {
rh.Error = nil
Expand Down Expand Up @@ -295,8 +308,8 @@ func (sr *ReverseScanResponse) Verify(req Request) error {
return nil
}

// Add adds a request to the batch request. The batch inherits
// the key range of the first request added to it.
// Add adds a request to the batch request. The batch key range is
// expanded to include the key ranges of all requests which it comprises.
//
// TODO(spencer): batches should include a list of key ranges
// representing the constituent requests.
Expand All @@ -305,9 +318,14 @@ func (br *BatchRequest) Add(args Request) {
if !union.SetValue(args) {
panic(fmt.Sprintf("unable to add %T to batch request", args))
}
if br.Key == nil {
br.Key = args.Header().Key
br.EndKey = args.Header().EndKey
h := args.Header()
if br.Key == nil || !br.Key.Less(h.Key) {
br.Key = h.Key
} else if br.EndKey.Less(h.Key) && !br.Key.Equal(h.Key) {
br.EndKey = h.Key
}
if br.EndKey == nil || (h.EndKey != nil && br.EndKey.Less(h.EndKey)) {
br.EndKey = h.EndKey
}
br.Requests = append(br.Requests, union)
}
Expand Down Expand Up @@ -513,4 +531,11 @@ func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*MergeRequest) flags() int { return isWrite }
func (*TruncateLogRequest) flags() int { return isWrite }
func (*LeaderLeaseRequest) flags() int { return isWrite }
func (*BatchRequest) flags() int { return isWrite }

func (br *BatchRequest) flags() int {
var flags int
for _, union := range br.Requests {
flags |= union.GetValue().(Request).flags()
}
return flags
}
Loading

0 comments on commit c42426a

Please sign in to comment.