diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 3b82fe8d848b..c03fc8d9b50a 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -463,6 +463,7 @@ func (sws *serverWatchStream) sendLoop() { var serr error if !fragmented && !ok { + // gofail: var gRPCStreamSendWatchResponse struct{} serr = sws.gRPCStream.Send(wr) } else { serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send) diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index 33114fc51aaa..3ac0899c854c 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -39,6 +39,8 @@ var ( maxWatchersPerSync = 512 ) +func GetChanBufLen() int { return chanBufLen } + type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) progress(w *watcher) diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 3d094c572814..dae13386f1d7 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -17,6 +17,7 @@ package integration import ( "bytes" "context" + "errors" "fmt" "reflect" "sort" @@ -24,13 +25,17 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/tests/v3/framework/integration" + gofail "go.etcd.io/gofail/runtime" ) // TestV3WatchFromCurrentRevision tests Watch APIs from current revision. @@ -1467,3 +1472,56 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) { t.Fatal("Wrong revision in progress notification!") } } + +// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response +// if its next revision of events are compacted and no lost events sent to client. +func TestV3NoEventsLostOnCompact(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } + integration.BeforeTest(t) + if len(gofail.List()) == 0 { + t.Skip("please run 'make gofail-enable' before running the test") + } + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // sendLoop throughput is rate-limited to 1 event per second + require.NoError(t, gofail.Enable("gRPCStreamSendWatchResponse", `sleep("1s")`)) + wch := client.Watch(ctx, "foo") + + var rev int64 = 0 + writeCount := mvcc.GetChanBufLen() * 11 / 10 + for i := 0; i < writeCount; i++ { + resp, err := client.Put(ctx, "foo", "bar") + require.NoError(t, err) + rev = resp.Header.Revision + } + _, err := client.Compact(ctx, rev) + require.NoError(t, err) + + time.Sleep(time.Second) + require.NoError(t, gofail.Disable("gRPCStreamSendWatchResponse")) + + eventCount := 0 + compacted := false + for resp := range wch { + err = resp.Err() + if err != nil { + if !errors.Is(err, rpctypes.ErrCompacted) { + t.Fatalf("want watch response err %v but got %v", rpctypes.ErrCompacted, err) + } + compacted = true + break + } + eventCount += len(resp.Events) + if eventCount == writeCount { + break + } + } + assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount) +} diff --git a/tests/robustness/makefile.mk b/tests/robustness/makefile.mk index 381db6301b99..40ce1a5b6f6c 100644 --- a/tests/robustness/makefile.mk +++ b/tests/robustness/makefile.mk @@ -36,7 +36,7 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g .PHONY: gofail-enable gofail-enable: install-gofail - gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ + gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION} cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION} cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}