Skip to content

Commit

Permalink
use default retention period to check user index may have expired chu…
Browse files Browse the repository at this point in the history
…nks when user does not have custom retention (#5261)
  • Loading branch information
sandeepsukhani authored Jan 28, 2022
1 parent cde9a71 commit 37fcbf9
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 60 deletions.
56 changes: 40 additions & 16 deletions pkg/storage/stores/shipper/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ type ExpirationChecker interface {
}

type expirationChecker struct {
tenantsRetention *TenantsRetention
latestRetentionStartTime model.Time
latestRetentionStartTimeByUser map[string]model.Time
tenantsRetention *TenantsRetention
latestRetentionStartTime latestRetentionStartTime
}

type Limits interface {
Expand Down Expand Up @@ -57,19 +56,26 @@ func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Tim
}

func (e *expirationChecker) MarkPhaseStarted() {
e.latestRetentionStartTime, e.latestRetentionStartTimeByUser = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits)
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("smallest retention period %v", e.latestRetentionStartTime))
e.latestRetentionStartTime = findLatestRetentionStartTime(model.Now(), e.tenantsRetention.limits)
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("overall smallest retention period %v, default smallest retention period %v",
e.latestRetentionStartTime.overall, e.latestRetentionStartTime.defaults))
}

func (e *expirationChecker) MarkPhaseFailed() {}
func (e *expirationChecker) MarkPhaseFinished() {}

func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
latestRetentionStartTime := e.latestRetentionStartTime
// when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime.
latestRetentionStartTime := e.latestRetentionStartTime.overall
if userID != "" {
latestRetentionStartTimeForUser, ok := e.latestRetentionStartTimeByUser[userID]
// when userID is not empty, it means we are checking for user index table.
latestRetentionStartTimeForUser, ok := e.latestRetentionStartTime.byUser[userID]
if ok {
// user has custom retention config, let us use user specific latest retention start time.
latestRetentionStartTime = latestRetentionStartTimeForUser
} else {
// user does not have custom retention config, let us use default latest retention start time.
latestRetentionStartTime = e.latestRetentionStartTime.defaults
}
}
return interval.Start.Before(latestRetentionStartTime)
Expand Down Expand Up @@ -119,17 +125,31 @@ Outer:
return globalRetention
}

// findLatestRetentionStartTime returns the latest retention start time overall and by each user.
func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, map[string]model.Time) {
type latestRetentionStartTime struct {
// defaults holds latest retention start time considering only default retention config.
// It is used to determine if user index table may have any expired chunks when the user does not have any custom retention config set.
defaults model.Time
// overall holds latest retention start time for all users considering both default and per user retention config.
// It is used to determine if common index table may have any expired chunks.
overall model.Time
// byUser holds latest retention start time considering only per user retention config.
// It is used to determine if user index table may have any expired chunks.
byUser map[string]model.Time
}

// findLatestRetentionStartTime returns the latest retention start time overall, just default config and by each user.
func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetentionStartTime {
// find the smallest retention period from default limits
defaultLimits := limits.DefaultLimits()
smallestRetentionPeriod := defaultLimits.RetentionPeriod
smallestDefaultRetentionPeriod := defaultLimits.RetentionPeriod
for _, streamRetention := range defaultLimits.StreamRetention {
if streamRetention.Period < smallestRetentionPeriod {
smallestRetentionPeriod = streamRetention.Period
if streamRetention.Period < smallestDefaultRetentionPeriod {
smallestDefaultRetentionPeriod = streamRetention.Period
}
}

overallSmallestRetentionPeriod := smallestDefaultRetentionPeriod

// find the smallest retention period by user
limitsByUserID := limits.AllByUserID()
smallestRetentionPeriodByUser := make(map[string]model.Time, len(limitsByUserID))
Expand All @@ -141,12 +161,16 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, ma
}
}

// update the common smallestRetentionPeriod if this user has smaller value
// update the overallSmallestRetentionPeriod if this user has smaller value
smallestRetentionPeriodByUser[userID] = now.Add(time.Duration(-smallestRetentionPeriodForUser))
if smallestRetentionPeriodForUser < smallestRetentionPeriod {
smallestRetentionPeriod = smallestRetentionPeriodForUser
if smallestRetentionPeriodForUser < overallSmallestRetentionPeriod {
overallSmallestRetentionPeriod = smallestRetentionPeriodForUser
}
}

return now.Add(time.Duration(-smallestRetentionPeriod)), smallestRetentionPeriodByUser
return latestRetentionStartTime{
defaults: now.Add(time.Duration(-smallestDefaultRetentionPeriod)),
overall: now.Add(time.Duration(-overallSmallestRetentionPeriod)),
byUser: smallestRetentionPeriodByUser,
}
}
161 changes: 117 additions & 44 deletions pkg/storage/stores/shipper/compactor/retention/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
const dayDuration = 24 * time.Hour
now := model.Now()
for _, tc := range []struct {
name string
limit fakeLimits
expectedLatestRetentionStartTime model.Time
expectedLatestRetentionStartTimeByUser map[string]model.Time
name string
limit fakeLimits
expectedLatestRetentionStartTime latestRetentionStartTime
}{
{
name: "only default retention set",
Expand All @@ -104,8 +103,11 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
retentionPeriod: 7 * dayDuration,
},
},
expectedLatestRetentionStartTime: now.Add(-7 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{},
},
},
{
name: "default retention period smallest",
Expand All @@ -123,10 +125,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
"1": {retentionPeriod: 15 * dayDuration},
},
},
expectedLatestRetentionStartTime: now.Add(-7 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-12 * dayDuration),
"1": now.Add(-15 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-12 * dayDuration),
"1": now.Add(-15 * dayDuration),
},
},
},
{
Expand All @@ -145,10 +150,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
"1": {retentionPeriod: 5 * dayDuration},
},
},
expectedLatestRetentionStartTime: now.Add(-3 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-7 * dayDuration),
"1": now.Add(-5 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-3 * dayDuration),
defaults: now.Add(-3 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-7 * dayDuration),
"1": now.Add(-5 * dayDuration),
},
},
},
{
Expand Down Expand Up @@ -181,10 +189,13 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
},
},
},
expectedLatestRetentionStartTime: now.Add(-5 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-5 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-5 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-5 * dayDuration),
},
},
},
{
Expand Down Expand Up @@ -217,63 +228,125 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
},
},
},
expectedLatestRetentionStartTime: now.Add(-2 * dayDuration),
expectedLatestRetentionStartTimeByUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-2 * dayDuration),
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-2 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{
"0": now.Add(-10 * dayDuration),
"1": now.Add(-2 * dayDuration),
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
latestRetentionStartTime, latestRetentionStartTimeByUser := findLatestRetentionStartTime(now, tc.limit)
latestRetentionStartTime := findLatestRetentionStartTime(now, tc.limit)
require.Equal(t, tc.expectedLatestRetentionStartTime, latestRetentionStartTime)
require.Equal(t, tc.expectedLatestRetentionStartTimeByUser, latestRetentionStartTimeByUser)
})
}
}

func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) {
now := model.Now()
expirationChecker := expirationChecker{
latestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-24 * time.Hour),
defaults: now.Add(-48 * time.Hour),
byUser: map[string]model.Time{
"user0": now.Add(-72 * time.Hour),
"user1": now.Add(-24 * time.Hour),
},
},
}

for _, tc := range []struct {
name string
expirationChecker expirationChecker
interval model.Interval
hasExpiredChunks bool
name string
userID string
interval model.Interval
hasExpiredChunks bool
}{
// common index using overallLatestRetentionStartTime
{
name: "not expired",
expirationChecker: expirationChecker{
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
name: "common index - not expired",
interval: model.Interval{
Start: now.Add(-23 * time.Hour),
End: now,
},
},
{
name: "common index - partially expired",
interval: model.Interval{
Start: model.Now().Add(-time.Hour),
End: model.Now(),
Start: now.Add(-25 * time.Hour),
End: now.Add(-22 * time.Hour),
},
hasExpiredChunks: true,
},
{
name: "partially expired",
expirationChecker: expirationChecker{
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
name: "common index - fully expired",
interval: model.Interval{
Start: now.Add(-26 * time.Hour),
End: now.Add(-25 * time.Hour),
},
hasExpiredChunks: true,
},

// user0 having custom retention
{
name: "user0 index - not expired",
userID: "user0",
interval: model.Interval{
Start: model.Now().Add(-25 * time.Hour),
End: model.Now().Add(-22 * time.Hour),
Start: now.Add(-71 + time.Hour),
End: now,
},
},
{
name: "user0 index - partially expired",
userID: "user0",
interval: model.Interval{
Start: now.Add(-73 * time.Hour),
End: now.Add(-71 * time.Hour),
},
hasExpiredChunks: true,
},
{
name: "user0 index - fully expired",
userID: "user0",
interval: model.Interval{
Start: now.Add(-74 * time.Hour),
End: now.Add(-73 * time.Hour),
},
hasExpiredChunks: true,
},

// user3 not having custom retention so using defaultLatestRetentionStartTime
{
name: "fully expired",
expirationChecker: expirationChecker{
latestRetentionStartTime: model.Now().Add(-24 * time.Hour),
name: "user3 index - not expired",
userID: "user3",
interval: model.Interval{
Start: now.Add(-47 * time.Hour),
End: now,
},
},
{
name: "user3 index - partially expired",
userID: "user3",
interval: model.Interval{
Start: now.Add(-49 * time.Hour),
End: now.Add(-47 * time.Hour),
},
hasExpiredChunks: true,
},
{
name: "user3 index - fully expired",
userID: "user3",
interval: model.Interval{
Start: model.Now().Add(-26 * time.Hour),
End: model.Now().Add(-25 * time.Hour),
Start: now.Add(-50 * time.Hour),
End: now.Add(-49 * time.Hour),
},
hasExpiredChunks: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.hasExpiredChunks, tc.expirationChecker.IntervalMayHaveExpiredChunks(tc.interval, ""))
require.Equal(t, tc.hasExpiredChunks, expirationChecker.IntervalMayHaveExpiredChunks(tc.interval, tc.userID))
})
}
}

0 comments on commit 37fcbf9

Please sign in to comment.