Skip to content

Commit

Permalink
Adds a read-only optimized path for transactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
James Phillips committed May 13, 2016
1 parent 9443c6b commit a37bf9d
Show file tree
Hide file tree
Showing 12 changed files with 625 additions and 56 deletions.
19 changes: 14 additions & 5 deletions api/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ type TxnResponse struct {
}

// Txn is used to apply multiple KV operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure
Expand All @@ -318,6 +319,13 @@ type TxnResponse struct {
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
//
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example:
//
// ops := KVTxnOps{
Expand All @@ -343,9 +351,9 @@ type TxnResponse struct {
// is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error
// message.
func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) {
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn")
r.setWriteOptions(q)
r.setQueryOptions(q)

// Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
Expand All @@ -359,8 +367,9 @@ func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMet
}
defer resp.Body.Close()

wm := &WriteMeta{}
wm.RequestTime = rtt
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
Expand All @@ -375,7 +384,7 @@ func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMet
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return resp.StatusCode == http.StatusOK, &kvResp, wm, nil
return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
}

var buf bytes.Buffer
Expand Down
26 changes: 26 additions & 0 deletions api/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,32 @@ func TestClient_Txn(t *testing.T) {
}
}

// Run a read-only transaction.
txn = KVTxnOps{
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}

if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 1 {
t.Fatalf("bad: %v", ret)
}
for _, result := range ret.Results {
if result.Key != key ||
!bytes.Equal(result.Value, value) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}

// Sanity check using the regular GET API.
pair, meta, err := kv.Get(key, nil)
if err != nil {
Expand Down
95 changes: 71 additions & 24 deletions command/agent/txn_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,43 +75,43 @@ func fixupKVOps(raw interface{}) error {
return nil
}

// Txn handles requests to apply multiple operations in a single, atomic
// transaction.
func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}

var args structs.TxnRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)

// convertOps takes the incoming body in API format and converts it to the
// internal RPC format. This returns a count of the number of write ops, and
// a boolean, that if false means an error response has been generated and
// processing should stop.
func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var ops api.TxnOps
if err := decodeBody(req, &ops, fixupKVOps); err != nil {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
return nil, nil
return nil, 0, false
}

// Convert the KV API format into the RPC format. Note that fixupKVOps
// above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over.
var opsRPC structs.TxnOps
var writes int
for _, in := range ops {
if in.KV != nil {
if size := len(in.KV.Value); size > maxKVSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)",
in.KV.Key, size, maxKVSize)))
return nil, nil
return nil, 0, false
}

verb := structs.KVSOp(in.KV.Verb)
if verb.IsWrite() {
writes += 1
}

out := &structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSOp(in.KV.Verb),
Verb: verb,
DirEnt: structs.DirEntry{
Key: in.KV.Key,
Value: in.KV.Value,
Expand All @@ -123,20 +123,67 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
},
},
}
args.Ops = append(args.Ops, out)
opsRPC = append(opsRPC, out)
}
}
return opsRPC, writes, true
}

// Make the request and return a conflict status if there were errors
// reported from the transaction.
var reply structs.TxnResponse
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
return nil, err
// Txn handles requests to apply multiple operations in a single, atomic
// transaction. A transaction consisting of only read operations will be fast-
// pathed to an endpoint that supports consistency modes (but not blocking),
// and everything else will be routed through Raft like a normal write.
func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
if len(reply.Errors) > 0 {

// Convert the ops from the API format to the internal format.
ops, writes, ok := s.convertOps(resp, req)
if !ok {
return nil, nil
}

// Fast-path a transaction with only writes to the read-only endpoint,
// which bypasses Raft, and allows for staleness.
conflict := false
var ret interface{}
if writes == 0 {
args := structs.TxnReadRequest{Ops: ops}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

var reply structs.TxnReadResponse
if err := s.agent.RPC("Txn.Read", &args, &reply); err != nil {
return nil, err
}

// Since we don't do blocking, we only add the relevant headers
// for metadata.
setLastContact(resp, reply.LastContact)
setKnownLeader(resp, reply.KnownLeader)

ret, conflict = reply, len(reply.Errors) > 0
} else {
args := structs.TxnRequest{Ops: ops}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)

var reply structs.TxnResponse
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
return nil, err
}
ret, conflict = reply, len(reply.Errors) > 0
}

// If there was a conflict return the response object but set a special
// status code.
if conflict {
var buf []byte
var err error
buf, err = s.marshalJSON(req, reply)
buf, err = s.marshalJSON(req, ret)
if err != nil {
return nil, err
}
Expand All @@ -148,5 +195,5 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
}

// Otherwise, return the results of the successful transaction.
return reply, nil
return ret, nil
}
70 changes: 70 additions & 0 deletions command/agent/txn_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,76 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
}
}

// Do a read-only transaction that should get routed to the
// fast-path endpoint.
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KV": {
"Verb": "get",
"Key": "key"
}
}
]
`, index)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}

header := resp.Header().Get("X-Consul-KnownLeader")
if header != "true" {
t.Fatalf("bad: %v", header)
}
header = resp.Header().Get("X-Consul-LastContact")
if header != "0" {
t.Fatalf("bad: %v", header)
}

txnResp, ok := obj.(structs.TxnReadResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(txnResp.Results) != 1 {
t.Fatalf("bad: %v", txnResp)
}
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
}

// Now that we have an index we can do a CAS to make sure the
// index field gets translated to the RPC format.
{
Expand Down
2 changes: 1 addition & 1 deletion consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
results, errors := c.state.TxnRun(index, req.Ops)
results, errors := c.state.TxnRW(index, req.Ops)
return structs.TxnResponse{results, errors}
}

Expand Down
41 changes: 34 additions & 7 deletions consul/state/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,8 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
return nil, nil
}

// TxnRun tries to run the given operations all inside a single transaction. If
// any of the operations fail, the entire transaction will be rolled back.
func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()

// Dispatch all of the operations inside the transaction.
// txnDispatch runs the given operations inside the state store transaction.
func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
Expand All @@ -118,10 +113,42 @@ func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults,
errors = append(errors, &structs.TxnError{i, err.Error()})
}
}

if len(errors) > 0 {
return nil, errors
}

return results, nil
}

// TxnRW tries to run the given operations all inside a single transaction. If
// any of the operations fail, the entire transaction will be rolled back. This
// is done in a full write transaction on the state store, so reads and writes
// are possible
func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()

results, errors := s.txnDispatch(tx, idx, ops)
if len(errors) > 0 {
return nil, errors
}

tx.Commit()
return results, nil
}

// TxnRO runs the given operations inside a single read transaction in the state
// store. You must verify outside this function that no write operations are
// present, otherwise you'll get an error from the state store.
func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(false)
defer tx.Abort()

results, errors := s.txnDispatch(tx, 0, ops)
if len(errors) > 0 {
return nil, errors
}

return results, nil
}
Loading

0 comments on commit a37bf9d

Please sign in to comment.