Skip to content

Commit

Permalink
Add Revisional compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Iwasaki Yudai committed Jun 20, 2017
1 parent 3f36e36 commit 6508c57
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 0 deletions.
106 changes: 106 additions & 0 deletions compactor/revisional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compactor

import (
"sync"

"github.com/jonboulle/clockwork"
"golang.org/x/net/context"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
)

// Revisional compacts the log by purging revisions older than
// the configured reivison number. Compaction happens every 5 minutes.
type Revisional struct {
clock clockwork.Clock
retention int64

rg RevGetter
c Compactable

ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
paused bool
}

// NewRevisional creates a new instance of Revisonal compactor that purges
// the log older than retention revisions from the current revision.
func NewRevisional(retention int64, rg RevGetter, c Compactable) *Revisional {
return &Revisional{
clock: clockwork.NewRealClock(),
retention: retention,
rg: rg,
c: c,
}
}

func (t *Revisional) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
clock := t.clock
previous := int64(0)

go func() {
for {
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}

rev := t.rg.Rev() - t.retention

if rev <= 0 || rev == previous {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
previous = rev
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
}
}
}()
}

func (t *Revisional) Stop() {
t.cancel()
}

func (t *Revisional) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

func (t *Revisional) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = false
}
117 changes: 117 additions & 0 deletions compactor/revisional_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compactor

import (
"reflect"
"testing"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"github.com/jonboulle/clockwork"
)

func TestRevisional(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Revisional{
clock: fc,
retention: 10,
rg: rg,
c: compactable,
}

tb.Run()
defer tb.Stop()

fc.Advance(checkCompactionInterval)
rg.Wait(1)
// nothing happens

rg.rev = 99 // will be 100
expectedRevision := int64(90)
fc.Advance(checkCompactionInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}

// skip the same revision
rg.rev = 99 // will be 100
expectedRevision = int64(90)
rg.Wait(1)
// nothing happens

rg.rev = 199 // will be 200
expectedRevision = int64(190)
fc.Advance(checkCompactionInterval)
rg.Wait(1)
a, err = compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}

func TestRevisionalPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
tb := &Revisional{
clock: fc,
retention: 10,
rg: rg,
c: compactable,
}

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
for i := 0; i < 3*n; i++ {
fc.Advance(checkCompactionInterval)
}
// tb ends up waiting for the clock

select {
case a := <-compactable.Chan():
t.Fatalf("unexpected action %v", a)
case <-time.After(10 * time.Millisecond):
}

// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
fc.Advance(checkCompactionInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
wreq := &pb.CompactionRequest{Revision: int64(90)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
}

0 comments on commit 6508c57

Please sign in to comment.