From 1f799aaeb59366bb17f6beb68b211031c2c47558 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 13 Nov 2024 19:49:09 +0800 Subject: [PATCH 1/5] apply fix --- server/follower_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/follower_controller.go b/server/follower_controller.go index 4e2578ba..6fcbcdfa 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -337,6 +337,7 @@ func (fc *followerController) Truncate(req *proto.TruncateRequest) (*proto.Trunc return nil, errors.Wrapf(err, "failed to truncate wal. truncate-offset: %d - wal-last-offset: %d", req.HeadEntryId.Offset, fc.wal.LastOffset()) } + fc.lastAppendedOffset = headOffset return &proto.TruncateResponse{ HeadEntryId: &proto.EntryId{ From 2c7bb0104ec35562d519b8d503afb9601d35e9fe Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 13 Nov 2024 19:49:23 +0800 Subject: [PATCH 2/5] test --- .github/workflows/docker_publish.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker_publish.yaml b/.github/workflows/docker_publish.yaml index 22287b27..0d3cdb2d 100644 --- a/.github/workflows/docker_publish.yaml +++ b/.github/workflows/docker_publish.yaml @@ -58,7 +58,7 @@ jobs: uses: docker/build-push-action@v3 with: context: . - platforms: linux/x86_64,linux/arm64 + platforms: linux/x86_64 push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} From dd4fd3ffda17e5b412d3dc87b2069b7bc4a74fb9 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 13 Nov 2024 20:19:39 +0800 Subject: [PATCH 3/5] Revert "test" This reverts commit 2c7bb0104ec35562d519b8d503afb9601d35e9fe. --- .github/workflows/docker_publish.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker_publish.yaml b/.github/workflows/docker_publish.yaml index 0d3cdb2d..22287b27 100644 --- a/.github/workflows/docker_publish.yaml +++ b/.github/workflows/docker_publish.yaml @@ -58,7 +58,7 @@ jobs: uses: docker/build-push-action@v3 with: context: . - platforms: linux/x86_64 + platforms: linux/x86_64,linux/arm64 push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} From f240dde4e0509976f4b5318963c2d259c5bf02de Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 13 Nov 2024 21:32:35 +0800 Subject: [PATCH 4/5] fix(replication): fix wrong memory status block replication --- server/follower_controller_test.go | 53 ++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index 4d640a29..009cfbce 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -87,8 +87,7 @@ func TestFollower(t *testing.T) { wg := common.NewWaitGroup(1) go func() { - err := fc.Replicate(stream) - assert.ErrorIs(t, err, context.Canceled) + _ = fc.Replicate(stream) wg.Done() }() @@ -111,6 +110,56 @@ func TestFollower(t *testing.T) { assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) assert.EqualValues(t, 1, fc.Term()) + // close follower + assert.NoError(t, fc.Close()) + + // new term to test if we can continue replicate messages + fc, err = NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory) + assert.NoError(t, err) + assert.Equal(t, proto.ServingStatus_NOT_MEMBER, fc.Status()) + fenceRes, err = fc.NewTerm(&proto.NewTermRequest{Term: 2}) + assert.NoError(t, err) + assert.Equal(t, proto.ServingStatus_FENCED, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + truncateResp, err = fc.Truncate(&proto.TruncateRequest{ + Term: 2, + HeadEntryId: &proto.EntryId{ + Term: 1, + Offset: 0, + }, + }) + assert.NoError(t, err) + assert.EqualValues(t, 2, truncateResp.HeadEntryId.Term) + + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + stream = newMockServerReplicateStream() + wg = common.NewWaitGroup(1) + go func() { + err := fc.Replicate(stream) + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + stream.AddRequest(createAddRequest(t, 2, 0, map[string]string{"a": "0", "b": "1"}, wal.InvalidOffset)) + // Wait for response + response = stream.GetResponse() + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 0, response.Offset) + // Write next entry + stream.AddRequest(createAddRequest(t, 2, 1, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset)) + + // Wait for response + response = stream.GetResponse() + assert.EqualValues(t, 1, response.Offset) + + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + + stream.AddRequest(createAddRequest(t, 2, 2, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset)) + response = stream.GetResponse() + assert.EqualValues(t, 2, response.Offset) + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + // Double-check the values in the DB // Keys are not there because they were not part of the commit offset dbRes, err := fc.(*followerController).db.Get(&proto.GetRequest{ From 757ef4d9c7f852c78a95eb2fea70c60590a3bcac Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 13 Nov 2024 21:44:43 +0800 Subject: [PATCH 5/5] fix lint --- server/follower_controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index 009cfbce..8f776fa1 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -117,7 +117,7 @@ func TestFollower(t *testing.T) { fc, err = NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory) assert.NoError(t, err) assert.Equal(t, proto.ServingStatus_NOT_MEMBER, fc.Status()) - fenceRes, err = fc.NewTerm(&proto.NewTermRequest{Term: 2}) + _, err = fc.NewTerm(&proto.NewTermRequest{Term: 2}) assert.NoError(t, err) assert.Equal(t, proto.ServingStatus_FENCED, fc.Status()) assert.EqualValues(t, 2, fc.Term())