Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
*: fill in WatchResponse.Header
Browse files Browse the repository at this point in the history
Related to etcd-io#3848.
  • Loading branch information
gyuho committed Jan 5, 2016
1 parent 70d120e commit 135dd4e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
// set up v3 demo rpc
grpcServer := grpc.NewServer()
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
}

Expand Down
30 changes: 25 additions & 5 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ package v3rpc
import (
"io"

"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb"
)

type watchServer struct {
server *etcdserver.EtcdServer
watchable storage.Watchable
}

func NewWatchServer(w storage.Watchable) pb.WatchServer {
return &watchServer{w}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
return &watchServer{s, s.Watchable()}
}

const (
Expand All @@ -44,6 +46,7 @@ const (
// and creates responses that forwarded to gRPC stream.
// It also forwards control message like watch created and canceled.
type serverWatchStream struct {
server *etcdserver.EtcdServer
gRPCStream pb.Watch_WatchServer
watchStream storage.WatchStream
ctrlStream chan *pb.WatchResponse
Expand All @@ -54,6 +57,7 @@ type serverWatchStream struct {

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
sws := serverWatchStream{
server: ws.server,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
Expand All @@ -67,6 +71,13 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
}

func (sws *serverWatchStream) recvLoop() error {
rh := &pb.ResponseHeader{
ClusterId: uint64(sws.server.ID()),
MemberId: uint64(sws.server.GetClusterID()),
Revision: sws.watchStream.GetRevision(),
RaftTerm: sws.server.GetRaftTerm(),
}

for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
Expand All @@ -87,7 +98,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
Header: rh,
WatchId: int64(id),
Created: true,
}
Expand All @@ -96,7 +107,7 @@ func (sws *serverWatchStream) recvLoop() error {
err := sws.watchStream.Cancel(storage.WatchID(id))
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
Header: rh,
WatchId: id,
Canceled: true,
}
Expand All @@ -109,6 +120,13 @@ func (sws *serverWatchStream) recvLoop() error {
}

func (sws *serverWatchStream) sendLoop() {
rh := &pb.ResponseHeader{
ClusterId: uint64(sws.server.ID()),
MemberId: uint64(sws.server.GetClusterID()),
Revision: sws.watchStream.GetRevision(),
RaftTerm: sws.server.GetRaftTerm(),
}

for {
select {
case wresp, ok := <-sws.watchStream.Chan():
Expand All @@ -126,8 +144,10 @@ func (sws *serverWatchStream) sendLoop() {
}

err := sws.gRPCStream.Send(&pb.WatchResponse{
Header: rh,
WatchId: int64(wresp.WatchID),
Events: events})
Events: events,
})
storage.ReportEventReceived()
if err != nil {
return
Expand Down
11 changes: 11 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,17 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
return s.cluster.Version()
}

func (s *EtcdServer) GetClusterID() types.ID {
if s.cluster == nil {
return 0
}
return s.cluster.ID()
}

func (s *EtcdServer) GetRaftTerm() uint64 {
return s.r.term
}

// monitorVersions checks the member's version every monitorVersion interval.
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
Expand Down
15 changes: 15 additions & 0 deletions storage/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package storage

import (
"errors"
"fmt"
"sync"

"github.com/coreos/etcd/storage/storagepb"
Expand Down Expand Up @@ -49,6 +50,9 @@ type WatchStream interface {

// Close closes the WatchChan and release all related resources.
Close()

// GetRevision returns the latest revision of storage.
GetRevision() int64
}

type WatchResponse struct {
Expand Down Expand Up @@ -113,3 +117,14 @@ func (ws *watchStream) Close() {
close(ws.ch)
watchStreamGauge.Dec()
}

func (ws *watchStream) GetRevision() int64 {
ws.mu.Lock()
defer ws.mu.Unlock()
s, ok := ws.watchable.(*watchableStore)
if !ok {
fmt.Println("aaaaa")
return 0
}
return s.Rev()
}

0 comments on commit 135dd4e

Please sign in to comment.