diff --git a/common/domain/replication_queue_test.go b/common/domain/replication_queue_test.go index 7dd40689745..d2a662bfff4 100644 --- a/common/domain/replication_queue_test.go +++ b/common/domain/replication_queue_test.go @@ -397,3 +397,137 @@ func TestGetDLQAckLevel(t *testing.T) { }) } } + +func TestRangeDeleteMessagesFromDLQ(t *testing.T) { + tests := []struct { + name string + firstID int64 + lastID int64 + wantErr bool + setupMock func(q *persistence.MockQueueManager) + }{ + { + name: "successful range delete from DLQ", + firstID: 10, + lastID: 20, + wantErr: false, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Eq(int64(10)), gomock.Eq(int64(20))).Return(nil) + }, + }, + { + name: "range delete from DLQ fails", + firstID: 10, + lastID: 20, + wantErr: true, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Eq(int64(10)), gomock.Eq(int64(20))).Return(errors.New("range delete error")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil) + tt.setupMock(mockQueue) + err := rq.RangeDeleteMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestDeleteMessageFromDLQ(t *testing.T) { + tests := []struct { + name string + messageID int64 + wantErr bool + setupMock func(q *persistence.MockQueueManager) + }{ + { + name: "successful delete from DLQ", + messageID: 15, + wantErr: false, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().DeleteMessageFromDLQ(gomock.Any(), gomock.Eq(int64(15))).Return(nil) + }, + }, + { + name: "delete from DLQ fails", + messageID: 15, + wantErr: true, + setupMock: func(q *persistence.MockQueueManager) { + q.EXPECT().DeleteMessageFromDLQ(gomock.Any(), gomock.Eq(int64(15))).Return(errors.New("delete error")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil) + tt.setupMock(mockQueue) + err := rq.DeleteMessageFromDLQ(context.Background(), tt.messageID) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetDLQSize(t *testing.T) { + tests := []struct { + name string + wantSize int64 + wantErr bool + setupMock func(m *persistence.MockQueueManager) + }{ + { + name: "returns correct size for non-empty DLQ", + wantSize: 10, + wantErr: false, + setupMock: func(m *persistence.MockQueueManager) { + m.EXPECT().GetDLQSize(gomock.Any()).Return(int64(10), nil) + }, + }, + { + name: "returns zero for empty DLQ", + wantSize: 0, + wantErr: false, + setupMock: func(m *persistence.MockQueueManager) { + m.EXPECT().GetDLQSize(gomock.Any()).Return(int64(0), nil) + }, + }, + { + name: "propagates error from underlying queue", + wantErr: true, + setupMock: func(m *persistence.MockQueueManager) { + m.EXPECT().GetDLQSize(gomock.Any()).Return(int64(0), errors.New("database error")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueueManager := persistence.NewMockQueueManager(ctrl) + tt.setupMock(mockQueueManager) + q := &replicationQueueImpl{queue: mockQueueManager} + size, err := q.GetDLQSize(context.Background()) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.wantSize, size) + } + }) + } +}