From ea0c65797a65dfa59c16844982b4756fdcd2e365 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 28 Sep 2016 20:46:02 +0800 Subject: [PATCH] etcdserver: use linearizableReadNotify for txn --- etcdserver/v3_server.go | 50 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 7fd5b6bfd63..49eaeeb88f3 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -165,6 +165,41 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) } func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + // TODO: remove this checking when we release etcd 3.2 + if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) { + return s.legacyTxn(ctx, r) + } + + if isTxnReadonly(r) { + if !isTxnSerializable(r) { + err := s.linearizableReadNotify(ctx) + if err != nil { + return nil, err + } + } + var resp *pb.TxnResponse + var err error + chk := func(ai *auth.AuthInfo) error { + return checkTxnAuth(s.authStore, ai, r) + } + get := func() { resp, err = s.applyV3Base.Txn(r) } + if serr := s.doSerialize(ctx, chk, get); serr != nil { + return nil, serr + } + return resp, err + } + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r}) + if err != nil { + return nil, err + } + if result.err != nil { + return nil, result.err + } + return result.resp.(*pb.TxnResponse), nil +} + +// TODO: remove this func when we release etcd 3.2 +func (s *EtcdServer) legacyTxn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { if isTxnSerializable(r) { var resp *pb.TxnResponse var err error @@ -177,7 +212,6 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse } return resp, err } - // TODO: readonly Txn do not need to go through raft result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r}) if err != nil { return nil, err @@ -202,6 +236,20 @@ func isTxnSerializable(r *pb.TxnRequest) bool { return true } +func isTxnReadonly(r *pb.TxnRequest) bool { + for _, u := range r.Success { + if r := u.GetRequestRange(); r == nil { + return false + } + } + for _, u := range r.Failure { + if r := u.GetRequestRange(); r == nil { + return false + } + } + return true +} + func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) if r.Physical && result != nil && result.physc != nil {