From a3b85db5f7052a22ba10c55466496a06cb3222bb Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Wed, 24 Oct 2018 17:45:22 -0400 Subject: [PATCH] storage: fix disappearing RHS merge bug The strategy used by the replica GC queue to determine whether a subsumed range can be GC'd is flawed. If a replica of the LHS was uninitialized at the time the merge commmitted, there is a small window where the replica GC queue can think that it's safe to clean up an RHS replica when in fact the uninitialized LHS replica could still initialize and apply a merge trigger that required that RHS to be present. Make the replica GC queue's strategy valid by requiring that all replicas of the LHS are initialized before beginning a merge transaction. This closes the window during which a replica of the RHS could be incorrectly GC'd with a patch that is small enough to be backported to v2.1.1. Fix #31719. Release note: None --- pkg/storage/api.pb.go | 350 +++++++++++++++++++++++++++---- pkg/storage/api.proto | 10 + pkg/storage/client_merge_test.go | 233 +++++++++++++++++++- pkg/storage/client_raft_test.go | 74 +++++++ pkg/storage/replica_command.go | 43 +++- pkg/storage/stores_server.go | 28 +++ 6 files changed, 694 insertions(+), 44 deletions(-) diff --git a/pkg/storage/api.pb.go b/pkg/storage/api.pb.go index 1639b33fce9f..b1f7193194e5 100644 --- a/pkg/storage/api.pb.go +++ b/pkg/storage/api.pb.go @@ -14,6 +14,8 @@ CollectChecksumResponse WaitForApplicationRequest WaitForApplicationResponse + WaitForReplicaInitRequest + WaitForReplicaInitResponse RaftHeartbeat RaftMessageRequest RaftMessageRequestBatch @@ -114,12 +116,32 @@ func (m *WaitForApplicationResponse) String() string { return proto.C func (*WaitForApplicationResponse) ProtoMessage() {} func (*WaitForApplicationResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} } +type WaitForReplicaInitRequest struct { + StoreRequestHeader `protobuf:"bytes,1,opt,name=header,embedded=header" json:"header"` + RangeID github_com_cockroachdb_cockroach_pkg_roachpb.RangeID `protobuf:"varint,2,opt,name=range_id,json=rangeId,proto3,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.RangeID" json:"range_id,omitempty"` +} + +func (m *WaitForReplicaInitRequest) Reset() { *m = WaitForReplicaInitRequest{} } +func (m *WaitForReplicaInitRequest) String() string { return proto.CompactTextString(m) } +func (*WaitForReplicaInitRequest) ProtoMessage() {} +func (*WaitForReplicaInitRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} } + +type WaitForReplicaInitResponse struct { +} + +func (m *WaitForReplicaInitResponse) Reset() { *m = WaitForReplicaInitResponse{} } +func (m *WaitForReplicaInitResponse) String() string { return proto.CompactTextString(m) } +func (*WaitForReplicaInitResponse) ProtoMessage() {} +func (*WaitForReplicaInitResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} } + func init() { proto.RegisterType((*StoreRequestHeader)(nil), "cockroach.storage.StoreRequestHeader") proto.RegisterType((*CollectChecksumRequest)(nil), "cockroach.storage.CollectChecksumRequest") proto.RegisterType((*CollectChecksumResponse)(nil), "cockroach.storage.CollectChecksumResponse") proto.RegisterType((*WaitForApplicationRequest)(nil), "cockroach.storage.WaitForApplicationRequest") proto.RegisterType((*WaitForApplicationResponse)(nil), "cockroach.storage.WaitForApplicationResponse") + proto.RegisterType((*WaitForReplicaInitRequest)(nil), "cockroach.storage.WaitForReplicaInitRequest") + proto.RegisterType((*WaitForReplicaInitResponse)(nil), "cockroach.storage.WaitForReplicaInitResponse") } // Reference imports to suppress errors if they are not otherwise used. @@ -135,6 +157,7 @@ const _ = grpc.SupportPackageIsVersion4 type PerReplicaClient interface { CollectChecksum(ctx context.Context, in *CollectChecksumRequest, opts ...grpc.CallOption) (*CollectChecksumResponse, error) WaitForApplication(ctx context.Context, in *WaitForApplicationRequest, opts ...grpc.CallOption) (*WaitForApplicationResponse, error) + WaitForReplicaInit(ctx context.Context, in *WaitForReplicaInitRequest, opts ...grpc.CallOption) (*WaitForReplicaInitResponse, error) } type perReplicaClient struct { @@ -163,11 +186,21 @@ func (c *perReplicaClient) WaitForApplication(ctx context.Context, in *WaitForAp return out, nil } +func (c *perReplicaClient) WaitForReplicaInit(ctx context.Context, in *WaitForReplicaInitRequest, opts ...grpc.CallOption) (*WaitForReplicaInitResponse, error) { + out := new(WaitForReplicaInitResponse) + err := grpc.Invoke(ctx, "/cockroach.storage.PerReplica/WaitForReplicaInit", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for PerReplica service type PerReplicaServer interface { CollectChecksum(context.Context, *CollectChecksumRequest) (*CollectChecksumResponse, error) WaitForApplication(context.Context, *WaitForApplicationRequest) (*WaitForApplicationResponse, error) + WaitForReplicaInit(context.Context, *WaitForReplicaInitRequest) (*WaitForReplicaInitResponse, error) } func RegisterPerReplicaServer(s *grpc.Server, srv PerReplicaServer) { @@ -210,6 +243,24 @@ func _PerReplica_WaitForApplication_Handler(srv interface{}, ctx context.Context return interceptor(ctx, in, info, handler) } +func _PerReplica_WaitForReplicaInit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WaitForReplicaInitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PerReplicaServer).WaitForReplicaInit(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.storage.PerReplica/WaitForReplicaInit", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PerReplicaServer).WaitForReplicaInit(ctx, req.(*WaitForReplicaInitRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _PerReplica_serviceDesc = grpc.ServiceDesc{ ServiceName: "cockroach.storage.PerReplica", HandlerType: (*PerReplicaServer)(nil), @@ -222,6 +273,10 @@ var _PerReplica_serviceDesc = grpc.ServiceDesc{ MethodName: "WaitForApplication", Handler: _PerReplica_WaitForApplication_Handler, }, + { + MethodName: "WaitForReplicaInit", + Handler: _PerReplica_WaitForReplicaInit_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "storage/api.proto", @@ -396,6 +451,55 @@ func (m *WaitForApplicationResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *WaitForReplicaInitRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WaitForReplicaInitRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintApi(dAtA, i, uint64(m.StoreRequestHeader.Size())) + n6, err := m.StoreRequestHeader.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n6 + if m.RangeID != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintApi(dAtA, i, uint64(m.RangeID)) + } + return i, nil +} + +func (m *WaitForReplicaInitResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WaitForReplicaInitResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + func encodeVarintApi(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -470,6 +574,23 @@ func (m *WaitForApplicationResponse) Size() (n int) { return n } +func (m *WaitForReplicaInitRequest) Size() (n int) { + var l int + _ = l + l = m.StoreRequestHeader.Size() + n += 1 + l + sovApi(uint64(l)) + if m.RangeID != 0 { + n += 1 + sovApi(uint64(m.RangeID)) + } + return n +} + +func (m *WaitForReplicaInitResponse) Size() (n int) { + var l int + _ = l + return n +} + func sovApi(x uint64) (n int) { for { n++ @@ -1043,6 +1164,155 @@ func (m *WaitForApplicationResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *WaitForReplicaInitRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WaitForReplicaInitRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WaitForReplicaInitRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StoreRequestHeader", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.StoreRequestHeader.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RangeID", wireType) + } + m.RangeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RangeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.RangeID(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *WaitForReplicaInitResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WaitForReplicaInitResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WaitForReplicaInitResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipApi(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -1151,43 +1421,45 @@ var ( func init() { proto.RegisterFile("storage/api.proto", fileDescriptorApi) } var fileDescriptorApi = []byte{ - // 597 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x54, 0x4f, 0x6f, 0xd3, 0x4e, - 0x10, 0xcd, 0xf6, 0x4f, 0x12, 0x6d, 0x7e, 0xd2, 0x4f, 0x5d, 0xa1, 0x52, 0x0c, 0xb2, 0x8b, 0x2b, - 0xa4, 0x82, 0xc0, 0x16, 0x09, 0x77, 0xd4, 0x24, 0x02, 0x7c, 0x28, 0x42, 0x1b, 0x15, 0xa4, 0x1e, - 0x88, 0x36, 0xde, 0xad, 0xe3, 0xc6, 0xf1, 0x1a, 0x7b, 0x8d, 0xf8, 0x18, 0x7c, 0x29, 0xa4, 0x70, - 0xcb, 0x11, 0x71, 0xb0, 0xc0, 0x88, 0x23, 0x5f, 0x80, 0x13, 0xda, 0xb5, 0x9d, 0x52, 0x12, 0xa4, - 0xc0, 0x8d, 0x53, 0x76, 0x67, 0xe7, 0xbd, 0x99, 0xf7, 0x66, 0x62, 0xb8, 0x93, 0x08, 0x1e, 0x13, - 0x8f, 0xd9, 0x24, 0xf2, 0xad, 0x28, 0xe6, 0x82, 0xa3, 0x1d, 0x97, 0xbb, 0x93, 0x98, 0x13, 0x77, - 0x6c, 0x95, 0x8f, 0xda, 0x75, 0x75, 0x8d, 0x46, 0xb6, 0x1f, 0x0a, 0x16, 0x87, 0x24, 0x18, 0xc6, - 0xe4, 0x4c, 0x14, 0xf9, 0xda, 0x6e, 0xf5, 0x38, 0x65, 0x82, 0x50, 0x22, 0x48, 0x19, 0x3f, 0xa8, - 0xa8, 0x59, 0xe8, 0xf9, 0x61, 0xf5, 0x23, 0xf3, 0x5e, 0xbb, 0x6e, 0xa7, 0x4c, 0xba, 0xe2, 0x71, - 0x8f, 0xab, 0xa3, 0x2d, 0x4f, 0x45, 0xd4, 0x9c, 0x03, 0x88, 0x06, 0x82, 0xc7, 0x0c, 0xb3, 0x57, - 0x29, 0x4b, 0xc4, 0x13, 0x46, 0x28, 0x8b, 0xd1, 0x29, 0x6c, 0x84, 0x9c, 0xb2, 0xa1, 0x4f, 0xf7, - 0xc0, 0x3e, 0x38, 0xdc, 0xee, 0x1e, 0xe5, 0x99, 0x51, 0x7f, 0xca, 0x29, 0x73, 0xfa, 0xdf, 0x33, - 0xa3, 0xe3, 0xf9, 0x62, 0x9c, 0x8e, 0x2c, 0x97, 0x4f, 0xed, 0x85, 0x06, 0x3a, 0xba, 0x38, 0xdb, - 0xd1, 0xc4, 0xb3, 0xcb, 0x6e, 0xad, 0x02, 0x86, 0xeb, 0x92, 0xd1, 0xa1, 0xe8, 0x25, 0x6c, 0xca, - 0x7e, 0x15, 0xf9, 0x86, 0x22, 0xef, 0xe5, 0x99, 0xd1, 0x50, 0x5d, 0x28, 0xf6, 0x07, 0x7f, 0xc4, - 0x5e, 0xe2, 0x70, 0x43, 0x91, 0x3a, 0xd4, 0x7c, 0xbf, 0x01, 0x77, 0x7b, 0x3c, 0x08, 0x98, 0x2b, - 0x7a, 0x63, 0xe6, 0x4e, 0x92, 0x74, 0x5a, 0x8a, 0x43, 0x8f, 0x61, 0x7d, 0xac, 0x04, 0x2a, 0x55, - 0xad, 0xf6, 0x2d, 0x6b, 0x69, 0x02, 0xd6, 0xb2, 0x1b, 0xdd, 0xe6, 0x2c, 0x33, 0x6a, 0xf3, 0xcc, - 0x00, 0xb8, 0x84, 0x4b, 0x0d, 0x31, 0x09, 0xbd, 0x85, 0x86, 0xcd, 0x42, 0x03, 0x96, 0xb1, 0xbf, - 0xd0, 0x50, 0xe2, 0x70, 0x43, 0x91, 0x3a, 0x14, 0x9d, 0xc3, 0x96, 0x5b, 0xf6, 0x2e, 0x4b, 0x6c, - 0xee, 0x83, 0xc3, 0xff, 0xba, 0x8e, 0x6c, 0xe3, 0xe3, 0xba, 0xee, 0xa7, 0xc2, 0x0f, 0xec, 0x34, - 0xf5, 0xa9, 0x75, 0x72, 0xe2, 0xf4, 0xf3, 0xcc, 0x80, 0x95, 0x1b, 0x4e, 0x1f, 0xc3, 0x8a, 0xdd, - 0xa1, 0x48, 0x83, 0xcd, 0xea, 0xb6, 0xb7, 0x25, 0x0b, 0xe1, 0xc5, 0xdd, 0x7c, 0x07, 0xe0, 0xd5, - 0x25, 0x2f, 0x93, 0x88, 0x87, 0x09, 0xbb, 0x84, 0x03, 0x97, 0x71, 0xe8, 0x21, 0x6c, 0x26, 0x21, - 0x89, 0x92, 0x31, 0x17, 0xca, 0x9f, 0x56, 0xfb, 0xe0, 0x27, 0xab, 0x2f, 0x64, 0x9f, 0x89, 0x41, - 0x99, 0xd6, 0x27, 0x82, 0xe0, 0x05, 0x08, 0x1d, 0xc3, 0x6d, 0xca, 0x02, 0x41, 0x94, 0xf4, 0x56, - 0xfb, 0xfe, 0x8a, 0x41, 0x15, 0x5b, 0x6e, 0x55, 0xcb, 0x6e, 0x1d, 0x3f, 0xef, 0xf5, 0x06, 0x82, - 0x88, 0xa4, 0x2f, 0x81, 0xdd, 0x2d, 0xe9, 0x16, 0x2e, 0x58, 0xcc, 0xaf, 0x00, 0x5e, 0x7b, 0x41, - 0x7c, 0xf1, 0x88, 0xc7, 0x47, 0x51, 0x14, 0xf8, 0x2e, 0x11, 0x3e, 0x0f, 0xff, 0xb9, 0xb5, 0x30, - 0x60, 0x2b, 0x60, 0x24, 0x61, 0x43, 0x3f, 0xa4, 0xec, 0x8d, 0xf2, 0x66, 0x0b, 0x43, 0x15, 0x72, - 0x64, 0xc4, 0xbc, 0x01, 0xb5, 0x55, 0x32, 0x8b, 0x89, 0xb5, 0xbf, 0x01, 0x08, 0x9f, 0xb1, 0x18, - 0x33, 0xf5, 0x84, 0xce, 0xe1, 0xff, 0xbf, 0xcc, 0x16, 0xdd, 0x5e, 0xa1, 0x7c, 0xf5, 0x7f, 0x49, - 0xbb, 0xb3, 0x4e, 0x6a, 0x51, 0xd8, 0xac, 0xa1, 0x04, 0xa2, 0xe5, 0xc6, 0xd0, 0xdd, 0x15, 0x1c, - 0xbf, 0x1d, 0x93, 0x76, 0x6f, 0xcd, 0xec, 0xaa, 0x68, 0xf7, 0xe6, 0xec, 0xb3, 0x5e, 0x9b, 0xe5, - 0x3a, 0x98, 0xe7, 0x3a, 0xf8, 0x90, 0xeb, 0xe0, 0x53, 0xae, 0x83, 0xb7, 0x5f, 0xf4, 0xda, 0x69, - 0xa3, 0xc4, 0x8f, 0xea, 0xea, 0x33, 0xd8, 0xf9, 0x11, 0x00, 0x00, 0xff, 0xff, 0x18, 0x2e, 0x39, - 0x5f, 0x9e, 0x05, 0x00, 0x00, + // 628 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x55, 0x5f, 0x6f, 0xd3, 0x3e, + 0x14, 0xad, 0xf7, 0xa7, 0xad, 0xdc, 0x9f, 0xf4, 0xd3, 0x2c, 0x34, 0x46, 0x41, 0xcd, 0xc8, 0x84, + 0x34, 0x10, 0x24, 0xa2, 0xe3, 0x1d, 0xad, 0xad, 0x80, 0x3c, 0x0c, 0x21, 0x4f, 0x03, 0x69, 0x0f, + 0x4c, 0x6e, 0xec, 0xa5, 0xde, 0xb2, 0x38, 0x24, 0x0e, 0xe2, 0x63, 0xf0, 0xa1, 0x40, 0x1a, 0x6f, + 0x7b, 0x44, 0x3c, 0x44, 0x10, 0xc4, 0x97, 0xe0, 0x09, 0xd9, 0x71, 0xba, 0x8d, 0x86, 0x69, 0xf0, + 0xb6, 0xa7, 0xda, 0xd7, 0xf7, 0x9c, 0x7b, 0xcf, 0xb9, 0x57, 0x0d, 0x5c, 0x4a, 0xa5, 0x48, 0x48, + 0xc0, 0x5c, 0x12, 0x73, 0x27, 0x4e, 0x84, 0x14, 0x68, 0xc9, 0x17, 0xfe, 0x61, 0x22, 0x88, 0x3f, + 0x71, 0xcc, 0x63, 0xf7, 0xa6, 0xbe, 0xc6, 0x63, 0x97, 0x47, 0x92, 0x25, 0x11, 0x09, 0xf7, 0x12, + 0xb2, 0x2f, 0xcb, 0xfc, 0xee, 0x72, 0xf5, 0x78, 0xc4, 0x24, 0xa1, 0x44, 0x12, 0x13, 0x5f, 0xab, + 0xa8, 0x59, 0x14, 0xf0, 0xa8, 0xfa, 0x51, 0x79, 0x6f, 0x7d, 0x7f, 0xc3, 0x24, 0x5d, 0x0b, 0x44, + 0x20, 0xf4, 0xd1, 0x55, 0xa7, 0x32, 0x6a, 0x9f, 0x00, 0x88, 0xb6, 0xa5, 0x48, 0x18, 0x66, 0x6f, + 0x32, 0x96, 0xca, 0x67, 0x8c, 0x50, 0x96, 0xa0, 0x5d, 0xd8, 0x8a, 0x04, 0x65, 0x7b, 0x9c, 0xae, + 0x80, 0x55, 0xb0, 0xbe, 0x38, 0xd8, 0x2c, 0x72, 0xab, 0xf9, 0x5c, 0x50, 0xe6, 0x8d, 0x7e, 0xe6, + 0xd6, 0x46, 0xc0, 0xe5, 0x24, 0x1b, 0x3b, 0xbe, 0x38, 0x72, 0xa7, 0x1a, 0xe8, 0xf8, 0xf4, 0xec, + 0xc6, 0x87, 0x81, 0x6b, 0xba, 0x75, 0x4a, 0x18, 0x6e, 0x2a, 0x46, 0x8f, 0xa2, 0xd7, 0xb0, 0xad, + 0xfa, 0xd5, 0xe4, 0x73, 0x9a, 0x7c, 0x58, 0xe4, 0x56, 0x4b, 0x77, 0xa1, 0xd9, 0x1f, 0xfd, 0x15, + 0xbb, 0xc1, 0xe1, 0x96, 0x26, 0xf5, 0xa8, 0xfd, 0x69, 0x0e, 0x2e, 0x0f, 0x45, 0x18, 0x32, 0x5f, + 0x0e, 0x27, 0xcc, 0x3f, 0x4c, 0xb3, 0x23, 0x23, 0x0e, 0x3d, 0x85, 0xcd, 0x89, 0x16, 0xa8, 0x55, + 0x75, 0xfa, 0x77, 0x9c, 0x99, 0x09, 0x38, 0xb3, 0x6e, 0x0c, 0xda, 0xc7, 0xb9, 0xd5, 0x38, 0xc9, + 0x2d, 0x80, 0x0d, 0x5c, 0x69, 0x48, 0x48, 0x14, 0x4c, 0x35, 0xcc, 0x97, 0x1a, 0xb0, 0x8a, 0xfd, + 0x83, 0x06, 0x83, 0xc3, 0x2d, 0x4d, 0xea, 0x51, 0x74, 0x00, 0x3b, 0xbe, 0xe9, 0x5d, 0x95, 0x98, + 0x5f, 0x05, 0xeb, 0xff, 0x0d, 0x3c, 0xd5, 0xc6, 0x97, 0xcb, 0xba, 0x9f, 0x49, 0x1e, 0xba, 0x59, + 0xc6, 0xa9, 0xb3, 0xb3, 0xe3, 0x8d, 0x8a, 0xdc, 0x82, 0x95, 0x1b, 0xde, 0x08, 0xc3, 0x8a, 0xdd, + 0xa3, 0xa8, 0x0b, 0xdb, 0xd5, 0x6d, 0x65, 0x41, 0x15, 0xc2, 0xd3, 0xbb, 0xfd, 0x11, 0xc0, 0xeb, + 0x33, 0x5e, 0xa6, 0xb1, 0x88, 0x52, 0x76, 0x0e, 0x07, 0xce, 0xe3, 0xd0, 0x63, 0xd8, 0x4e, 0x23, + 0x12, 0xa7, 0x13, 0x21, 0xb5, 0x3f, 0x9d, 0xfe, 0xda, 0x19, 0xab, 0x4f, 0x65, 0xef, 0xcb, 0x6d, + 0x93, 0x36, 0x22, 0x92, 0xe0, 0x29, 0x08, 0x6d, 0xc1, 0x45, 0xca, 0x42, 0x49, 0xb4, 0xf4, 0x4e, + 0xff, 0x61, 0xcd, 0xa0, 0xca, 0x2d, 0x77, 0xaa, 0x65, 0x77, 0xb6, 0x5e, 0x0e, 0x87, 0xdb, 0x92, + 0xc8, 0x74, 0xa4, 0x80, 0x83, 0x05, 0xe5, 0x16, 0x2e, 0x59, 0xec, 0x1f, 0x00, 0xde, 0x78, 0x45, + 0xb8, 0x7c, 0x22, 0x92, 0xcd, 0x38, 0x0e, 0xb9, 0x4f, 0x24, 0x17, 0xd1, 0x95, 0x5b, 0x0b, 0x0b, + 0x76, 0x42, 0x46, 0x52, 0xb6, 0xc7, 0x23, 0xca, 0xde, 0x69, 0x6f, 0x16, 0x30, 0xd4, 0x21, 0x4f, + 0x45, 0xec, 0x5b, 0xb0, 0x5b, 0x27, 0xb3, 0x9c, 0x98, 0xfd, 0xe1, 0xd4, 0x05, 0xcc, 0xf4, 0xb3, + 0x17, 0x71, 0x79, 0xd5, 0x5c, 0x38, 0x23, 0xf2, 0x9c, 0x8a, 0x52, 0x64, 0xbf, 0x98, 0x83, 0xf0, + 0x05, 0xab, 0x9e, 0xd0, 0x01, 0xfc, 0xff, 0xb7, 0x05, 0x46, 0x77, 0x6b, 0x84, 0xd5, 0xff, 0x61, + 0x74, 0xef, 0x5d, 0x26, 0xd5, 0xb8, 0xdb, 0x40, 0x29, 0x44, 0xb3, 0xee, 0xa3, 0xfb, 0x35, 0x1c, + 0x7f, 0xdc, 0xc5, 0xee, 0x83, 0x4b, 0x66, 0xd7, 0x14, 0x3d, 0xe3, 0xc6, 0x45, 0x45, 0x67, 0x47, + 0x7f, 0x51, 0xd1, 0x1a, 0x8b, 0xed, 0xc6, 0xe0, 0xf6, 0xf1, 0xb7, 0x5e, 0xe3, 0xb8, 0xe8, 0x81, + 0x93, 0xa2, 0x07, 0x3e, 0x17, 0x3d, 0xf0, 0xb5, 0xe8, 0x81, 0xf7, 0xdf, 0x7b, 0x8d, 0xdd, 0x96, + 0xc1, 0x8f, 0x9b, 0xfa, 0x03, 0xb3, 0xf1, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x9d, 0xba, 0x22, 0x72, + 0xf8, 0x06, 0x00, 0x00, } diff --git a/pkg/storage/api.proto b/pkg/storage/api.proto index e8b978fdda73..c753e2455637 100644 --- a/pkg/storage/api.proto +++ b/pkg/storage/api.proto @@ -66,7 +66,17 @@ message WaitForApplicationRequest { message WaitForApplicationResponse { } +message WaitForReplicaInitRequest { + StoreRequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + int64 range_id = 2 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; +} + +message WaitForReplicaInitResponse { +} + service PerReplica { rpc CollectChecksum(CollectChecksumRequest) returns (CollectChecksumResponse) {} rpc WaitForApplication(WaitForApplicationRequest) returns (WaitForApplicationResponse) {} + rpc WaitForReplicaInit(WaitForReplicaInitRequest) returns (WaitForReplicaInitResponse) {} } diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index 21f16ae47567..1507480fcc91 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) func adminMergeArgs(key roachpb.Key) *roachpb.AdminMergeRequest { @@ -1961,13 +1962,14 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) { } } -func TestStoreRangeMergeDeadFollower(t *testing.T) { +func TestStoreRangeMergeDeadFollowerBeforeTxn(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() + var mtc *multiTestContext storeCfg := storage.TestStoreConfig(nil) storeCfg.TestingKnobs.DisableMergeQueue = true - mtc := &multiTestContext{storeConfig: &storeCfg} + mtc = &multiTestContext{storeConfig: &storeCfg} mtc.Start(t, 3) defer mtc.Stop() store0 := mtc.Store(0) @@ -1980,6 +1982,38 @@ func TestStoreRangeMergeDeadFollower(t *testing.T) { mtc.stopStore(2) + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := client.SendWrapped(ctx, store0.TestSender(), args) + expErr := "waiting for all left-hand replicas to initialize" + if !testutils.IsPError(pErr, expErr) { + t.Fatalf("expected %q error, but got %v", expErr, pErr) + } +} + +func TestStoreRangeMergeDeadFollowerDuringTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + var mtc *multiTestContext + storeCfg := storage.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableMergeQueue = true + storeCfg.TestingKnobs.TestingRequestFilter = func(ba roachpb.BatchRequest) *roachpb.Error { + if ba.IsSingleSubsumeRequest() && mtc.Store(2) != nil { + mtc.stopStore(2) + } + return nil + } + mtc = &multiTestContext{storeConfig: &storeCfg} + mtc.Start(t, 3) + defer mtc.Stop() + store0 := mtc.Store(0) + + mtc.replicateRange(roachpb.RangeID(1), 1, 2) + lhsDesc, _, err := createSplitRanges(ctx, store0) + if err != nil { + t.Fatal(err) + } + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) _, pErr := client.SendWrapped(ctx, store0.TestSender(), args) expErr := "merge of range into 1 failed: waiting for all right-hand replicas to catch up" @@ -2150,6 +2184,201 @@ func TestStoreRangeMergeReadoptedLHSFollower(t *testing.T) { mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2) } +// slowSnapRaftHandler delays any snapshots to rangeID until waitCh is closed. +type slowSnapRaftHandler struct { + rangeID roachpb.RangeID + waitCh chan struct{} + storage.RaftMessageHandler + syncutil.Mutex +} + +func (h *slowSnapRaftHandler) unblock() { + h.Lock() + if h.waitCh != nil { + close(h.waitCh) + h.waitCh = nil + } + h.Unlock() +} + +func (h *slowSnapRaftHandler) HandleSnapshot( + header *storage.SnapshotRequest_Header, respStream storage.SnapshotResponseStream, +) error { + if header.RaftMessageRequest.RangeID == h.rangeID { + h.Lock() + waitCh := h.waitCh + h.Unlock() + if waitCh != nil { + <-waitCh + } + } + return h.RaftMessageHandler.HandleSnapshot(header, respStream) +} + +// TestStoreRangeMergeUninitializedLHSFollower reproduces a rare bug in which a +// replica of the RHS of a merge could be garbage collected too soon. +// +// Consider two adjacent ranges, A and B. Suppose the replica of +// A on the last store, S3, is uninitialized, e.g. because A was recently +// created by a split and S3 has neither processed the split trigger nor +// received a snapshot. The leaseholder for A will attempt to send a Raft +// snapshot to bring S3's replica up to date, but this Raft snapshot may be +// delayed due to a busy Raft snapshot queue or a slow network. +// +// Now suppose a merge of A and B commits before S3 receives a Raft snapshot for +// A. There is a small window of time in which S3 can garbage collect its +// replica of B! When S3 looks up B's meta2 descriptor, it will find that B has +// been merged away. S3 will then try to prove that B's local left neighbor is +// generationally up-to-date; if it is, it safe to GC B. Usually, S3 would +// determine A to be B's left neighbor, realize that A has not yet processed the +// merge, and correctly refuse to GC its replica of B. In this case, however, +// S3's replica of A is uninitialized and thus doesn't know its start and end +// key, so S3 will instead discover some more-distant left neighbor of B. This +// distant neighbor might very well be up-to-date, and S3 will incorreclty +// conclude that it can GC its replica of B! +// +// So say S3 GCs its replica of B. There are now two paths that A might take. +// The happy case is that A receives a Raft snapshot that postdates the merge. +// The unhappy case is that A receives a Raft snapshot that predates the merge, +// and is then required to apply the merge via a MsgApp. Since there is no +// longer a replica of B on S3, applying the merge trigger will explode. +// +// The solution was to require that all LHS replicas are initialized before +// beginning a merge transaction. This ensures that the replica GC queue will +// always discover the correct left neighbor when considering whether a subsumed +// range can be GC'd. +func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + storeCfg := storage.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableReplicateQueue = true + mtc := &multiTestContext{storeConfig: &storeCfg} + mtc.Start(t, 3) + defer mtc.Stop() + store0, store2 := mtc.Store(0), mtc.Store(2) + distSender := mtc.distSenders[0] + + split := func(key roachpb.RKey) roachpb.RangeID { + t.Helper() + if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(key.AsRawKey())); pErr != nil { + t.Fatal(pErr) + } + return store0.LookupReplica(key).RangeID + } + + // We'll create two ranges, A and B, as described in the comment on this test + // function. + aKey, bKey := roachpb.RKey("a"), roachpb.RKey("b") + + // Put range 1 on all three stores. + mtc.replicateRange(roachpb.RangeID(1), 1, 2) + + // Create range B and wait for store2 to process the split. + bRangeID := split(bKey) + var bRepl2 *storage.Replica + testutils.SucceedsSoon(t, func() (err error) { + if bRepl2, err = store2.GetReplica(bRangeID); err != nil || !bRepl2.IsInitialized() { + return errors.New("store2 has not yet processed split of c") + } + return nil + }) + + // Now we want to create range A, but we need to make sure store2's replica of + // A is not initialized. This requires dropping all Raft traffic to store2 + // from range 1, which will be the LHS of the split, so that store2's replica + // of range 1 never processes the split trigger, which would create an + // initialized replica of A. + unreliableHandler := &unreliableRaftHandler{ + rangeID: roachpb.RangeID(1), + RaftMessageHandler: store2, + } + mtc.transport.Listen(store2.Ident.StoreID, unreliableHandler) + + // Perform the split of A, now that store2 won't be able to initialize its + // replica of A. + aRangeID := split(aKey) + + // Wedge a Raft snapshot that's destined for A. This allows us to capture a + // pre-merge Raft snapshot, which we'll let loose after the merge commits. + slowSnapHandler := &slowSnapRaftHandler{ + rangeID: aRangeID, + waitCh: make(chan struct{}), + RaftMessageHandler: unreliableHandler, + } + defer slowSnapHandler.unblock() + mtc.transport.Listen(store2.Ident.StoreID, slowSnapHandler) + + // Remove the replica of range 1 on store2. If we were to leave it in place, + // store2 would refuse to GC its replica of C after the merge commits, because + // the left neighbor of C would be this out-of-date replica of range 1. + // (Remember that we refused to let it process the split of A.) So we need to + // remove it so that C has no left neighbor and thus will be eligible for GC. + { + r1Repl2, err := store2.GetReplica(roachpb.RangeID(1)) + if err != nil { + t.Fatal(err) + } + mtc.unreplicateRange(roachpb.RangeID(1), 2) + testutils.SucceedsSoon(t, func() error { + if err := store2.ManualReplicaGC(r1Repl2); err != nil { + return err + } + if _, err := store2.GetReplica(roachpb.RangeID(1)); err == nil { + return errors.New("r1Repl2 still exists") + } + return nil + }) + } + + // Launch the merge of A and B. + mergeErr := make(chan error) + go func() { + _, pErr := client.SendWrapped(ctx, distSender, adminMergeArgs(aKey.AsRawKey())) + mergeErr <- pErr.GoError() + }() + + // We want to assert that the merge does not complete until we allow store2's + // replica of B to be initialized (by releasing the blocked Raft snapshot). A + // happens-before assertion is nearly impossible to express, though, so + // instead we just wait in the hope that, if the merge is buggy, it will + // commit while we wait. Before the bug was fixed, this caused the test + // to fail reliably. + start := timeutil.Now() + for timeutil.Since(start) < 50*time.Millisecond { + if _, err := store2.GetReplica(bRangeID); err == nil { + // Attempt to reproduce the exact fatal error described in the comment on + // the test by running range B through the GC queue. If the bug is + // present, GC will be successful and so the application of the merge + // trigger on A to fail once we allow the Raft snapshot through. If the + // bug is not present, we'll be unable to GC range B because it won't get + // subsumed until after we allow the Raft snapshot through. + _ = store2.ManualReplicaGC(bRepl2) + } + time.Sleep(5 * time.Millisecond) // don't spin too hot to give the merge CPU time to complete + } + + select { + case err := <-mergeErr: + t.Errorf("merge completed early (err: %v)", err) + close(mergeErr) + default: + } + + // Allow store2's replica of A to initialize with a Raft snapshot that + // predates the merge. + slowSnapHandler.unblock() + + // Assert that the merge completes successfully. + if err := <-mergeErr; err != nil { + t.Fatal(err) + } + + // Give store2 the lease on the merged range to force all commands to be + // applied, including the merge trigger. + mtc.transferLease(ctx, aRangeID, 0, 2) +} + // TestStoreRangeMergeWatcher verifies that the watcher goroutine for a merge's // RHS does not erroneously permit traffic after the merge commits. func TestStoreRangeMergeWatcher(t *testing.T) { diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index d4fc57ea7f7c..41408a84534e 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -4158,3 +4158,77 @@ func TestStoreRangeWaitForApplication(t *testing.T) { } } } + +func TestStoreWaitForReplicaInit(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + sc := storage.TestStoreConfig(nil) + mtc := &multiTestContext{storeConfig: &sc} + mtc.Start(t, 1) + defer mtc.Stop() + store := mtc.Store(0) + + conn, err := mtc.nodeDialer.Dial(ctx, store.Ident.NodeID) + if err != nil { + t.Fatal(err) + } + client := storage.NewPerReplicaClient(conn) + storeHeader := storage.StoreRequestHeader{NodeID: store.Ident.NodeID, StoreID: store.Ident.StoreID} + + // Test that WaitForReplicaInit returns successfully if the replica exists. + _, err = client.WaitForReplicaInit(ctx, &storage.WaitForReplicaInitRequest{ + StoreRequestHeader: storeHeader, + RangeID: roachpb.RangeID(1), + }) + if err != nil { + t.Fatal(err) + } + + // Test that WaitForReplicaInit times out if the replica does not exist. + { + timeoutCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer cancel() + _, err = client.WaitForReplicaInit(timeoutCtx, &storage.WaitForReplicaInitRequest{ + StoreRequestHeader: storeHeader, + RangeID: roachpb.RangeID(2), + }) + if exp := "context deadline exceeded"; !testutils.IsError(err, exp) { + t.Fatalf("expected %q error, but got %v", exp, err) + } + } + + // Test that WaitForReplicaInit times out if the replica exists but is not + // initialized. + { + // Constructing an permanently-uninitialized replica is somewhat difficult. + // Sending a fake Raft heartbeat for a range ID that the store hasn't seen + // before does the trick. + var repl42 *storage.Replica + testutils.SucceedsSoon(t, func() (err error) { + // Try several times, as the message may be dropped (see #18355). + mtc.transport.SendAsync(&storage.RaftMessageRequest{ + ToReplica: roachpb.ReplicaDescriptor{ + NodeID: store.Ident.NodeID, + StoreID: store.Ident.StoreID, + }, + Heartbeats: []storage.RaftHeartbeat{{RangeID: 42, ToReplicaID: 1}}, + }) + repl42, err = store.GetReplica(42) + return err + }) + if repl42.IsInitialized() { + t.Fatalf("test bug: repl42 is initialized") + } + + timeoutCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer cancel() + _, err = client.WaitForReplicaInit(timeoutCtx, &storage.WaitForReplicaInitRequest{ + StoreRequestHeader: storeHeader, + RangeID: roachpb.RangeID(42), + }) + if exp := "context deadline exceeded"; !testutils.IsError(err, exp) { + t.Fatalf("expected %q error, but got %v", exp, err) + } + } +} diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 9809313a62d9..04e67aa0484b 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -22,8 +22,6 @@ import ( "strings" "time" - "github.com/cockroachdb/cockroach/pkg/storage/storagepb" - "github.com/pkg/errors" "go.etcd.io/etcd/raft/raftpb" @@ -38,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -411,6 +410,15 @@ func (r *Replica) AdminMerge( return reply, roachpb.NewErrorf("cannot merge final range") } + // Ensure that every current replica of the LHS has been initialized. + // Otherwise there is a rare race where the replica GC queue can GC a + // replica of the RHS too early. The comment on + // TestStoreRangeMergeUninitializedLHSFollower explains the situation in full. + if err := waitForReplicasInit(ctx, r.store.cfg.NodeDialer, *origLeftDesc); err != nil { + return reply, roachpb.NewError(errors.Wrap( + err, "waiting for all left-hand replicas to initialize")) + } + updatedLeftDesc := *origLeftDesc rightDescKey := keys.RangeDescriptorKey(origLeftDesc.EndKey) @@ -592,7 +600,7 @@ func waitForApplication( g := ctxgroup.WithContext(ctx) for _, repl := range desc.Replicas { - repl := repl + repl := repl // copy for goroutine g.GoCtx(func(ctx context.Context) error { conn, err := dialer.Dial(ctx, repl.NodeID) if err != nil { @@ -609,6 +617,35 @@ func waitForApplication( return g.Wait() } +// waitForReplicasInit blocks until it has proof that the replicas listed in +// desc are initialized on their respective stores. It may return a false +// negative, i.e., claim that a replica is uninitialized when it is, in fact, +// initialized, but it will never return a false positive. +func waitForReplicasInit( + ctx context.Context, dialer *nodedialer.Dialer, desc roachpb.RangeDescriptor, +) error { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + g := ctxgroup.WithContext(ctx) + for _, repl := range desc.Replicas { + repl := repl // copy for goroutine + g.GoCtx(func(ctx context.Context) error { + conn, err := dialer.Dial(ctx, repl.NodeID) + if err != nil { + return errors.Wrapf(err, "could not dial n%d", repl.NodeID) + } + _, err = NewPerReplicaClient(conn).WaitForReplicaInit(ctx, &WaitForReplicaInitRequest{ + StoreRequestHeader: StoreRequestHeader{NodeID: repl.NodeID, StoreID: repl.StoreID}, + RangeID: desc.RangeID, + }) + return err + }) + } + return g.Wait() +} + type snapshotError struct { cause error } diff --git a/pkg/storage/stores_server.go b/pkg/storage/stores_server.go index 2c90da1d4869..275f057fa1b1 100644 --- a/pkg/storage/stores_server.go +++ b/pkg/storage/stores_server.go @@ -80,6 +80,9 @@ func (is Server) CollectChecksum( } // WaitForApplication implements PerReplicaServer. +// +// It is the caller's responsibility to cancel or set a timeout on the context. +// If the context is never canceled, WaitForApplication will retry forever. func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) { @@ -109,3 +112,28 @@ func (is Server) WaitForApplication( }) return resp, err } + +// WaitForReplicaInit implements PerReplicaServer. +// +// It is the caller's responsibility to cancel or set a timeout on the context. +// If the context is never canceled, WaitForReplicaInit will retry forever. +func (is Server) WaitForReplicaInit( + ctx context.Context, req *WaitForReplicaInitRequest, +) (*WaitForReplicaInitResponse, error) { + resp := &WaitForReplicaInitResponse{} + err := is.execStoreCommand(req.StoreRequestHeader, func(s *Store) error { + retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond} + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + // Long-lived references to replicas are frowned upon, so re-fetch the + // replica on every turn of the loop. + if repl, err := s.GetReplica(req.RangeID); err == nil && repl.IsInitialized() { + return nil + } + } + if ctx.Err() == nil { + log.Fatal(ctx, "infinite retry loop exited but context has no error") + } + return ctx.Err() + }) + return resp, err +}