Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
Merge pull request etcd-io#4140 from xiang90/storage
Browse files Browse the repository at this point in the history
*: make backend outside kv
  • Loading branch information
xiang90 committed Jan 6, 2016
2 parents 70d120e + 5dd3f91 commit 82f2cd6
Show file tree
Hide file tree
Showing 18 changed files with 204 additions and 132 deletions.
7 changes: 5 additions & 2 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
dstorage "github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
Expand Down Expand Up @@ -358,7 +359,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}

if cfg.V3demo {
srv.kv = dstorage.New(path.Join(cfg.SnapDir(), databaseFilename), &srv.consistIndex)
be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.kv = dstorage.New(be, &srv.consistIndex)
if err := srv.kv.Restore(); err != nil {
plog.Fatalf("v3 storage restore error: %v", err)
}
Expand Down Expand Up @@ -583,7 +585,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("rename snapshot file error: %v", err)
}

newKV := dstorage.New(fn, &s.consistIndex)
newbe := backend.NewDefaultBackend(fn)
newKV := dstorage.New(newbe, &s.consistIndex)
if err := newKV.Restore(); err != nil {
plog.Panicf("restore KV error: %v", err)
}
Expand Down
10 changes: 7 additions & 3 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
dstorage "github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/store"
)

Expand Down Expand Up @@ -864,9 +865,12 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}

s.kv = dstorage.New(
path.Join(testdir, "testdb.db"),
&s.consistIndex)
be, tmpPath := backend.NewDefaultTmpBackend()
defer func() {
be.Close()
os.RemoveAll(tmpPath)
}()
s.kv = dstorage.New(be, &s.consistIndex)

s.start()
defer s.Stop()
Expand Down
26 changes: 26 additions & 0 deletions storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@ import (
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"log"
"os"
"path"
"sync/atomic"
"time"

"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
)

var (
defaultBatchLimit = 10000
defaultBatchInterval = 100 * time.Millisecond
)

type Backend interface {
BatchTx() BatchTx
Snapshot() Snapshot
Expand Down Expand Up @@ -60,6 +68,10 @@ func New(path string, d time.Duration, limit int) Backend {
return newBackend(path, d, limit)
}

func NewDefaultBackend(path string) Backend {
return newBackend(path, defaultBatchInterval, defaultBatchLimit)
}

func newBackend(path string, d time.Duration, limit int) *backend {
db, err := bolt.Open(path, 0600, boltOpenOptions)
if err != nil {
Expand Down Expand Up @@ -151,6 +163,20 @@ func (b *backend) Close() error {
return b.db.Close()
}

// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
if err != nil {
log.Fatal(err)
}
tmpPath := path.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
}

func NewDefaultTmpBackend() (*backend, string) {
return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
}

type snapshot struct {
*bolt.Tx
}
Expand Down
21 changes: 5 additions & 16 deletions storage/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,16 @@ package backend

import (
"io/ioutil"
"log"
"os"
"path"
"testing"
"time"

"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
"github.com/coreos/etcd/pkg/testutil"
)

var tmpPath string

func init() {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
if err != nil {
log.Fatal(err)
}
tmpPath = path.Join(dir, "database")
}

func TestBackendClose(t *testing.T) {
b := newBackend(tmpPath, time.Hour, 10000)
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)

// check close could work
Expand All @@ -57,7 +45,7 @@ func TestBackendClose(t *testing.T) {
}

func TestBackendSnapshot(t *testing.T) {
b := New(tmpPath, time.Hour, 10000)
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

tx := b.BatchTx()
Expand Down Expand Up @@ -93,8 +81,9 @@ func TestBackendSnapshot(t *testing.T) {
}

func TestBackendBatchIntervalCommit(t *testing.T) {
// start backend with super short batch interval
b := newBackend(tmpPath, time.Nanosecond, 10000)
// start backend with super short batch interval so
// we do not need to wait long before commit to happen.
b, tmpPath := NewTmpBackend(time.Nanosecond, 10000)
defer cleanup(b, tmpPath)

tx := b.BatchTx()
Expand Down
13 changes: 7 additions & 6 deletions storage/backend/batch_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func TestBatchTxPut(t *testing.T) {
b := newBackend(tmpPath, time.Hour, 10000)
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

tx := b.batchTx
Expand All @@ -48,7 +48,7 @@ func TestBatchTxPut(t *testing.T) {
}

func TestBatchTxRange(t *testing.T) {
b := newBackend(tmpPath, time.Hour, 10000)
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

tx := b.batchTx
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestBatchTxRange(t *testing.T) {
}

func TestBatchTxDelete(t *testing.T) {
b := newBackend(tmpPath, time.Hour, 10000)
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

tx := b.batchTx
Expand All @@ -142,7 +142,7 @@ func TestBatchTxDelete(t *testing.T) {
}

func TestBatchTxCommit(t *testing.T) {
b := newBackend(tmpPath, time.Hour, 10000)
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

tx := b.batchTx
Expand All @@ -169,8 +169,9 @@ func TestBatchTxCommit(t *testing.T) {
}

func TestBatchTxBatchLimitCommit(t *testing.T) {
// start backend with batch limit 1
b := newBackend(tmpPath, time.Hour, 1)
// start backend with batch limit 1 so one write can
// trigger a commit
b, tmpPath := NewTmpBackend(time.Hour, 1)
defer cleanup(b, tmpPath)

tx := b.batchTx
Expand Down
14 changes: 7 additions & 7 deletions storage/consistent_watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"log"

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/storage/storagepb"
)

Expand Down Expand Up @@ -46,16 +47,15 @@ type consistentWatchableStore struct {
skip bool // indicate whether or not to skip an operation
}

func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newConsistentWatchableStore(path, ig)
func New(b backend.Backend, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newConsistentWatchableStore(b, ig)
}

// newConsistentWatchableStore creates a new consistentWatchableStore
// using the file at the given path.
// If the file at the given path does not exist then it will be created automatically.
func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consistentWatchableStore {
// newConsistentWatchableStore creates a new consistentWatchableStore with the give
// backend.
func newConsistentWatchableStore(b backend.Backend, ig ConsistentIndexGetter) *consistentWatchableStore {
return &consistentWatchableStore{
watchableStore: newWatchableStore(path),
watchableStore: newWatchableStore(b),
ig: ig,
}
}
Expand Down
16 changes: 11 additions & 5 deletions storage/consistent_watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@

package storage

import "testing"
import (
"testing"

"github.com/coreos/etcd/storage/backend"
)

type indexVal uint64

func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }

func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
var idx indexVal
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
b, tmpPath := backend.NewDefaultTmpBackend()
s := newConsistentWatchableStore(b, &idx)
defer cleanup(s, b, tmpPath)

tests := []uint64{1, 2, 3, 5, 10}
for i, tt := range tests {
Expand All @@ -41,8 +46,9 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {

func TestConsistentWatchableStoreSkip(t *testing.T) {
idx := indexVal(5)
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
b, tmpPath := backend.NewDefaultTmpBackend()
s := newConsistentWatchableStore(b, &idx)
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), NoLease)

Expand Down
Loading

0 comments on commit 82f2cd6

Please sign in to comment.