Skip to content

Commit

Permalink
server: Make gc_worker's service safepoint special (tikv#3146)
Browse files Browse the repository at this point in the history
* server: Make gc_worker's service safepoint special

Signed-off-by: MyonKeminta <[email protected]>

* Address comments

Signed-off-by: MyonKeminta <[email protected]>

Co-authored-by: MyonKeminta <[email protected]>
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta and MyonKeminta committed Nov 12, 2020
1 parent 465b03c commit 1ed1aac
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 16 deletions.
43 changes: 34 additions & 9 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (
)

const (
clusterPath = "raft"
configPath = "config"
schedulePath = "schedule"
gcPath = "gc"
rulesPath = "rules"
replicationPath = "replication_mode"
componentPath = "component"
customScheduleConfigPath = "scheduler_config"
clusterPath = "raft"
configPath = "config"
schedulePath = "schedule"
gcPath = "gc"
rulesPath = "rules"
replicationPath = "replication_mode"
componentPath = "component"
customScheduleConfigPath = "scheduler_config"
gcWorkerServiceSafePointID = "gc_worker"
)

const (
Expand Down Expand Up @@ -418,6 +419,14 @@ type ServiceSafePoint struct {

// SaveServiceGCSafePoint saves a GC safepoint for the service
func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}

if ssp.ServiceID == gcWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 {
return errors.New("TTL of gc_worker's service safe point must be infinity")
}

key := path.Join(gcPath, "safe_point", "service", ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
Expand All @@ -429,10 +438,25 @@ func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error {

// RemoveServiceGCSafePoint removes a GC safepoint for the service
func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error {
if serviceID == gcWorkerServiceSafePointID {
return errors.New("cannot remove service safe point of gc_worker")
}
key := path.Join(gcPath, "safe_point", "service", serviceID)
return s.Remove(key)
}

func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: gcWorkerServiceSafePointID,
SafePoint: 0,
ExpiredAt: math.MaxInt64,
}
if err := s.SaveServiceGCSafePoint(ssp); err != nil {
return nil, err
}
return ssp, nil
}

// LoadMinServiceGCSafePoint returns the minimum safepoint across all services
func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) {
prefix := path.Join(gcPath, "safe_point", "service")
Expand All @@ -443,7 +467,8 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e
return nil, err
}
if len(keys) == 0 {
return &ServiceSafePoint{}, nil
// There's no service safepoint. Store an initial value for GC worker.
return s.initServiceGCSafePointForGCWorker()
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
Expand Down
4 changes: 0 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,6 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
return nil, err
}
}
// If ssp is the first safepoint, it is the min value now
if min.SafePoint == 0 {
min = ssp
}
}

return &pdpb.UpdateServiceGCSafePointResponse{
Expand Down
39 changes: 36 additions & 3 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,24 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
TTL int64
SafePoint uint64
}{
{"a", 1000, 1},
{"b", 1000, 2},
{"a", 1000, 1},
{"c", 1000, 3},
}
for _, ssp := range serviceSafePoints {
min, err := s.client.UpdateServiceGCSafePoint(context.Background(),
ssp.ServiceID, 1000, ssp.SafePoint)
c.Assert(err, IsNil)
c.Assert(min, Equals, uint64(1))
// An service safepoint of ID "gc_worker" is automatically initialized as 0
c.Assert(min, Equals, uint64(0))
}

min, err := s.client.UpdateServiceGCSafePoint(context.Background(),
"gc_worker", math.MaxInt64, 10)
c.Assert(err, IsNil)
c.Assert(min, Equals, uint64(1))

min, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 4)
c.Assert(err, IsNil)
c.Assert(min, Equals, uint64(2))
Expand Down Expand Up @@ -685,7 +692,7 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
c.Assert(minSsp.ServiceID, Equals, "c")
c.Assert(minSsp.ExpiredAt, Less, oldMinSsp.ExpiredAt)

// TTL can be infinite
// TTL can be infinite (represented by math.MaxInt64)
min, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", math.MaxInt64, 3)
c.Assert(err, IsNil)
Expand All @@ -694,6 +701,32 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "c")
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))

// Delete "a" and "c"
min, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", -1, 3)
c.Assert(err, IsNil)
c.Assert(min, Equals, uint64(4))
min, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", -1, 4)
c.Assert(err, IsNil)
// Now gc_worker is the only remaining service safe point.
c.Assert(min, Equals, uint64(10))

// gc_worker cannot be deleted.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"gc_worker", -1, 10)
c.Assert(err, NotNil)

// Cannot set non-infinity TTL for gc_worker
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"gc_worker", 10000000, 10)
c.Assert(err, NotNil)

// Service safepoint must have a non-empty ID
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"", 1000, 15)
c.Assert(err, NotNil)
}

func (s *testClientSuite) TestScatterRegion(c *C) {
Expand Down

0 comments on commit 1ed1aac

Please sign in to comment.