-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support auto-compaction with finer granularity #8563
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,10 +26,10 @@ import ( | |
) | ||
|
||
// Periodic compacts the log by purging revisions older than | ||
// the configured retention time. Compaction happens hourly. | ||
// the configured retention time. | ||
type Periodic struct { | ||
clock clockwork.Clock | ||
periodInHour int | ||
clock clockwork.Clock | ||
period time.Duration | ||
|
||
rg RevGetter | ||
c Compactable | ||
|
@@ -38,60 +38,60 @@ type Periodic struct { | |
ctx context.Context | ||
cancel context.CancelFunc | ||
|
||
mu sync.Mutex | ||
// mu protects paused | ||
mu sync.RWMutex | ||
paused bool | ||
} | ||
|
||
// NewPeriodic creates a new instance of Periodic compactor that purges | ||
// the log older than h hours. | ||
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { | ||
// the log older than h Duration. | ||
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { | ||
return &Periodic{ | ||
clock: clockwork.NewRealClock(), | ||
periodInHour: h, | ||
rg: rg, | ||
c: c, | ||
clock: clockwork.NewRealClock(), | ||
period: h, | ||
rg: rg, | ||
c: c, | ||
} | ||
} | ||
|
||
// periodDivisor divides Periodic.period in into checkCompactInterval duration | ||
const periodDivisor = 10 | ||
|
||
func (t *Periodic) Run() { | ||
t.ctx, t.cancel = context.WithCancel(context.Background()) | ||
t.revs = make([]int64, 0) | ||
clock := t.clock | ||
|
||
checkCompactInterval := t.period / time.Duration(periodDivisor) | ||
go func() { | ||
last := clock.Now() | ||
for { | ||
t.revs = append(t.revs, t.rg.Rev()) | ||
select { | ||
case <-t.ctx.Done(): | ||
return | ||
case <-clock.After(checkCompactionInterval): | ||
case <-clock.After(checkCompactInterval): | ||
t.mu.Lock() | ||
p := t.paused | ||
t.mu.Unlock() | ||
if p { | ||
continue | ||
} | ||
} | ||
|
||
if clock.Now().Sub(last) < executeCompactionInterval { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While we're improving this function. Can move the remaining code in the for loop into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we refactor the code in a future pr? |
||
if clock.Now().Sub(last) < t.period { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we simplify the timer logic a bit here? I found the logic a bit difficult to follow with the Could we instead do something like
and then eliminate this (let me know if I'm missing an important subtlety to how the timers are being use here...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jpbetz I am trying to minimize any change. we can refactor the code in a future pr if you want. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. |
||
continue | ||
} | ||
|
||
rev, remaining := t.getRev(t.periodInHour) | ||
rev, remaining := t.getRev() | ||
if rev < 0 { | ||
continue | ||
} | ||
|
||
plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour) | ||
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) | ||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) | ||
if err == nil || err == mvcc.ErrCompacted { | ||
t.revs = remaining | ||
last = clock.Now() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why we delete this line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good a catch. I don't think this is meant to be deleted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, it actually causes a bug as #9443 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I dig into on why
I think that's cause. |
||
plog.Noticef("Finished auto-compaction at revision %d", rev) | ||
} else { | ||
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) | ||
plog.Noticef("Retry after %v", checkCompactionInterval) | ||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) | ||
plog.Noticef("Retry after %v", checkCompactInterval) | ||
} | ||
} | ||
}() | ||
|
@@ -113,8 +113,8 @@ func (t *Periodic) Resume() { | |
t.paused = false | ||
} | ||
|
||
func (t *Periodic) getRev(h int) (int64, []int64) { | ||
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) | ||
func (t *Periodic) getRev() (int64, []int64) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pass in n? probably we do not need the global defined const There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah. ok. makes sense. |
||
i := len(t.revs) - periodDivisor | ||
if i < 0 { | ||
return -1, t.revs | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import ( | |
"net" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -127,6 +128,24 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { | |
} | ||
} | ||
|
||
var ( | ||
autoCompactionRetention time.Duration | ||
h int | ||
) | ||
// AutoCompactionRetention defaults to "0" if not set. | ||
if len(cfg.AutoCompactionRetention) == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a check see if AutoCompactionRetention is not set. |
||
cfg.AutoCompactionRetention = "0" | ||
} | ||
h, err = strconv.Atoi(cfg.AutoCompactionRetention) | ||
if err == nil { | ||
autoCompactionRetention = time.Duration(int64(h)) * time.Hour | ||
} else { | ||
autoCompactionRetention, err = time.ParseDuration(cfg.AutoCompactionRetention) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing AutoCompactionRetention: %v", err) | ||
} | ||
} | ||
|
||
srvcfg := etcdserver.ServerConfig{ | ||
Name: cfg.Name, | ||
ClientURLs: cfg.ACUrls, | ||
|
@@ -145,7 +164,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { | |
PeerTLSInfo: cfg.PeerTLSInfo, | ||
TickMs: cfg.TickMs, | ||
ElectionTicks: cfg.ElectionTicks(), | ||
AutoCompactionRetention: cfg.AutoCompactionRetention, | ||
AutoCompactionRetention: autoCompactionRetention, | ||
AutoCompactionMode: cfg.AutoCompactionMode, | ||
QuotaBackendBytes: cfg.QuotaBackendBytes, | ||
MaxTxnOps: cfg.MaxTxnOps, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this const can also be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkCompactionInterval
is usedRevision
compactor.https://github.com/coreos/etcd/blob/master/compactor/revision.go#L64
I don't think it should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack