Skip to content

Commit

Permalink
own RequestHeader for BatchRequest
Browse files Browse the repository at this point in the history
for now the same structure, but they will now be
able to diverge.
  • Loading branch information
tbg committed Sep 29, 2015
1 parent cf2873c commit e6a0762
Show file tree
Hide file tree
Showing 14 changed files with 2,297 additions and 267 deletions.
12 changes: 11 additions & 1 deletion client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,17 @@ func SendWrapped(sender Sender, ctx context.Context, args roachpb.Request) (roac
}
ba, unwrap := func(args roachpb.Request) (*roachpb.BatchRequest, func(*roachpb.BatchResponse) roachpb.Response) {
ba := &roachpb.BatchRequest{}
ba.RequestHeader = *(proto.Clone(args.Header()).(*roachpb.RequestHeader))
{
h := *(proto.Clone(args.Header()).(*roachpb.RequestHeader))
ba.Key, ba.EndKey = h.Key, h.EndKey
ba.CmdID = h.CmdID
ba.Timestamp = h.Timestamp
ba.Replica = h.Replica
ba.RangeID = h.RangeID
ba.UserPriority = h.UserPriority
ba.Txn = h.Txn
ba.ReadConsistency = h.ReadConsistency
}
ba.Add(args)
return ba, func(br *roachpb.BatchResponse) roachpb.Response {
var unwrappedReply roachpb.Response
Expand Down
2 changes: 1 addition & 1 deletion kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) {
if method != "Node.Batch" {
t.Fatalf("unexpected method:%s", method)
}
header := getArgs(testAddress).(*roachpb.BatchRequest).RequestHeader
header := getArgs(testAddress).(*roachpb.BatchRequest).BatchRequest_Header
batchReply := getReply().(*roachpb.BatchResponse)
reply := &roachpb.ScanResponse{}
batchReply.Add(reply)
Expand Down
4 changes: 2 additions & 2 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (tc *TxnCoordSender) startStats() {
// It is checked that the individual call does not have a UserPriority
// or Txn set that differs from the batch's.
// TODO(tschottdorf): will go with #2143.
func updateForBatch(args roachpb.Request, bHeader roachpb.RequestHeader) error {
func updateForBatch(args roachpb.Request, bHeader roachpb.BatchRequest_Header) error {
// Disallow transaction, user and priority on individual calls, unless
// equal.
aHeader := args.Header()
Expand Down Expand Up @@ -329,7 +329,7 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r
// we've eliminated all the redundancies.
for _, arg := range ba.Requests {
trace.Event(fmt.Sprintf("%T", arg.GetValue()))
if err := updateForBatch(arg.GetInner(), ba.RequestHeader); err != nil {
if err := updateForBatch(arg.GetInner(), ba.BatchRequest_Header); err != nil {
return nil, roachpb.NewError(err)
}
}
Expand Down
45 changes: 0 additions & 45 deletions roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package roachpb

import (
"fmt"
"math/rand"
"strconv"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -114,50 +113,6 @@ type Combinable interface {
Combine(Response) error
}

// TODO(marc): we should assert
// var _ security.RequestWithUser = &RequestHeader{}
// here, but we need to break cycles first.

// GetUser implements security.RequestWithUser.
// KV messages are always sent by the node user.
func (*RequestHeader) GetUser() string {
// TODO(marc): we should use security.NodeUser here, but we need to break cycles first.
return "node"
}

// GetOrCreateCmdID returns the request header's command ID if available.
// Otherwise, creates a new ClientCmdID, initialized with current time
// and random salt.
func (rh *RequestHeader) GetOrCreateCmdID(walltime int64) (cmdID ClientCmdID) {
if !rh.CmdID.IsEmpty() {
cmdID = rh.CmdID
} else {
cmdID = ClientCmdID{
WallTime: walltime,
Random: rand.Int63(),
}
}
return
}

// TraceID implements tracer.Traceable by returning the first nontrivial
// TraceID of the Transaction and CmdID.
func (rh *RequestHeader) TraceID() string {
if r := rh.Txn.TraceID(); r != "" {
return r
}
return rh.CmdID.TraceID()
}

// TraceName implements tracer.Traceable and behaves like TraceID, but using
// the TraceName of the object delegated to.
func (rh *RequestHeader) TraceName() string {
if r := rh.Txn.TraceID(); r != "" {
return rh.Txn.TraceName()
}
return rh.CmdID.TraceName()
}

func combineError(a, b interface{}) error {
return fmt.Errorf("illegal combination: (%T).Combine(%T)", a, b)
}
Expand Down
Loading

0 comments on commit e6a0762

Please sign in to comment.