From 1ed1aacac1b82cf30ee75a8e31c614add4cf5cf4 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Wed, 11 Nov 2020 11:49:44 +0800 Subject: [PATCH] server: Make gc_worker's service safepoint special (#3146) * server: Make gc_worker's service safepoint special Signed-off-by: MyonKeminta * Address comments Signed-off-by: MyonKeminta Co-authored-by: MyonKeminta Signed-off-by: MyonKeminta --- server/core/storage.go | 43 +++++++++++++++++++++++++++++-------- server/grpc_service.go | 4 ---- tests/client/client_test.go | 39 ++++++++++++++++++++++++++++++--- 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/server/core/storage.go b/server/core/storage.go index 45ad270aaf4..24df5c49928 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -33,14 +33,15 @@ import ( ) const ( - clusterPath = "raft" - configPath = "config" - schedulePath = "schedule" - gcPath = "gc" - rulesPath = "rules" - replicationPath = "replication_mode" - componentPath = "component" - customScheduleConfigPath = "scheduler_config" + clusterPath = "raft" + configPath = "config" + schedulePath = "schedule" + gcPath = "gc" + rulesPath = "rules" + replicationPath = "replication_mode" + componentPath = "component" + customScheduleConfigPath = "scheduler_config" + gcWorkerServiceSafePointID = "gc_worker" ) const ( @@ -418,6 +419,14 @@ type ServiceSafePoint struct { // SaveServiceGCSafePoint saves a GC safepoint for the service func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { + if ssp.ServiceID == "" { + return errors.New("service id of service safepoint cannot be empty") + } + + if ssp.ServiceID == gcWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 { + return errors.New("TTL of gc_worker's service safe point must be infinity") + } + key := path.Join(gcPath, "safe_point", "service", ssp.ServiceID) value, err := json.Marshal(ssp) if err != nil { @@ -429,10 +438,25 @@ func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { // RemoveServiceGCSafePoint removes a GC safepoint for the service func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error { + if serviceID == gcWorkerServiceSafePointID { + return errors.New("cannot remove service safe point of gc_worker") + } key := path.Join(gcPath, "safe_point", "service", serviceID) return s.Remove(key) } +func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) { + ssp := &ServiceSafePoint{ + ServiceID: gcWorkerServiceSafePointID, + SafePoint: 0, + ExpiredAt: math.MaxInt64, + } + if err := s.SaveServiceGCSafePoint(ssp); err != nil { + return nil, err + } + return ssp, nil +} + // LoadMinServiceGCSafePoint returns the minimum safepoint across all services func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) { prefix := path.Join(gcPath, "safe_point", "service") @@ -443,7 +467,8 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e return nil, err } if len(keys) == 0 { - return &ServiceSafePoint{}, nil + // There's no service safepoint. Store an initial value for GC worker. + return s.initServiceGCSafePointForGCWorker() } min := &ServiceSafePoint{SafePoint: math.MaxUint64} diff --git a/server/grpc_service.go b/server/grpc_service.go index 1eff43c4b55..25fdbfeb05a 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -825,10 +825,6 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd return nil, err } } - // If ssp is the first safepoint, it is the min value now - if min.SafePoint == 0 { - min = ssp - } } return &pdpb.UpdateServiceGCSafePointResponse{ diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 556f36ceb95..65eb785156c 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -634,17 +634,24 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { TTL int64 SafePoint uint64 }{ - {"a", 1000, 1}, {"b", 1000, 2}, + {"a", 1000, 1}, {"c", 1000, 3}, } for _, ssp := range serviceSafePoints { min, err := s.client.UpdateServiceGCSafePoint(context.Background(), ssp.ServiceID, 1000, ssp.SafePoint) c.Assert(err, IsNil) - c.Assert(min, Equals, uint64(1)) + // An service safepoint of ID "gc_worker" is automatically initialized as 0 + c.Assert(min, Equals, uint64(0)) } + min, err := s.client.UpdateServiceGCSafePoint(context.Background(), + "gc_worker", math.MaxInt64, 10) + c.Assert(err, IsNil) + c.Assert(min, Equals, uint64(1)) + + min, err = s.client.UpdateServiceGCSafePoint(context.Background(), "a", 1000, 4) c.Assert(err, IsNil) c.Assert(min, Equals, uint64(2)) @@ -685,7 +692,7 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { c.Assert(minSsp.ServiceID, Equals, "c") c.Assert(minSsp.ExpiredAt, Less, oldMinSsp.ExpiredAt) - // TTL can be infinite + // TTL can be infinite (represented by math.MaxInt64) min, err = s.client.UpdateServiceGCSafePoint(context.Background(), "c", math.MaxInt64, 3) c.Assert(err, IsNil) @@ -694,6 +701,32 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { c.Assert(err, IsNil) c.Assert(minSsp.ServiceID, Equals, "c") c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64)) + + // Delete "a" and "c" + min, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "c", -1, 3) + c.Assert(err, IsNil) + c.Assert(min, Equals, uint64(4)) + min, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "a", -1, 4) + c.Assert(err, IsNil) + // Now gc_worker is the only remaining service safe point. + c.Assert(min, Equals, uint64(10)) + + // gc_worker cannot be deleted. + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "gc_worker", -1, 10) + c.Assert(err, NotNil) + + // Cannot set non-infinity TTL for gc_worker + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "gc_worker", 10000000, 10) + c.Assert(err, NotNil) + + // Service safepoint must have a non-empty ID + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "", 1000, 15) + c.Assert(err, NotNil) } func (s *testClientSuite) TestScatterRegion(c *C) {