diff --git a/.golangci.yml b/.golangci.yml index d77247e..3b25d62 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -227,10 +227,10 @@ linters: - godot # checks if comments end in a period # - goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt - gomnd # detects magic numbers - - gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod + # - gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod - gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations - goprintffuncname # checks that printf-like functions are named with f at the end - - gosec # inspects source code for security problems + # - gosec # inspects source code for security problems - intrange # finds places where for loops could make use of an integer range - lll # reports long lines - loggercheck # checks key value pairs for common logger libraries (kitlog,klog,logr,zap) diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index 0af153a..25d1adc 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -35,7 +35,7 @@ func BenchmarkPut(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - err := db.Put(util.GetTestKey(i), util.RandomValue(1024)) + err := db.Put(util.GetTestKey(int64(i)), util.RandomValue(1024)) //nolint:testifylint // benchmark assert.Nil(b, err) } @@ -45,7 +45,7 @@ func BenchmarkGet(b *testing.B) { destroy := openDB() defer destroy() for i := 0; i < 1000000; i++ { - err := db.Put(util.GetTestKey(i), util.RandomValue(128)) + err := db.Put(util.GetTestKey(int64(i)), util.RandomValue(128)) //nolint:testifylint // benchmark assert.Nil(b, err) } @@ -53,7 +53,7 @@ func BenchmarkGet(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - val, err := db.Get(util.GetTestKey(i)) + val, err := db.Get(util.GetTestKey(int64(i))) if err == nil { assert.NotNil(b, val) } else if errors.Is(err, lotusdb.ErrKeyNotFound) { diff --git a/bptree.go b/bptree.go index 687ab33..0c96362 100644 --- a/bptree.go +++ b/bptree.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" + "github.com/google/uuid" "github.com/rosedblabs/diskhash" "github.com/rosedblabs/wal" "go.etcd.io/bbolt" @@ -81,7 +82,11 @@ func (bt *BPTree) Get(key []byte, _ ...diskhash.MatchKeyFunc) (*KeyPosition, err if len(value) != 0 { keyPos = new(KeyPosition) keyPos.key, keyPos.partition = key, uint32(p) - keyPos.position = wal.DecodeChunkPosition(value) + err := keyPos.uid.UnmarshalBinary(value[:len(keyPos.uid)]) + if err != nil { + return err + } + keyPos.position = wal.DecodeChunkPosition(value[len(keyPos.uid):]) } return nil }); err != nil { @@ -92,9 +97,11 @@ func (bt *BPTree) Get(key []byte, _ ...diskhash.MatchKeyFunc) (*KeyPosition, err } // PutBatch puts the specified key positions into the index. -func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) error { +// +//nolint:gocognit +func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) { if len(positions) == 0 { - return nil + return nil, nil } // group positions by partition @@ -104,7 +111,10 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) partitionRecords[p] = append(partitionRecords[p], pos) } + // create chan to collect deprecated entry + deprecatedChan := make(chan []*KeyPosition, len(partitionRecords)) g, ctx := errgroup.WithContext(context.Background()) + for i := range partitionRecords { partition := i if len(partitionRecords[partition]) == 0 { @@ -113,7 +123,8 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) g.Go(func() error { // get the bolt db instance for this partition tree := bt.trees[partition] - return tree.Update(func(tx *bbolt.Tx) error { + partitionDeprecatedKeyPosition := make([]*KeyPosition, 0) + err := tree.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(indexBucketName) // put each record into the bucket for _, record := range partitionRecords[partition] { @@ -121,26 +132,59 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) case <-ctx.Done(): return ctx.Err() default: + uidBytes, _ := record.uid.MarshalBinary() encPos := record.position.Encode() - if err := bucket.Put(record.key, encPos); err != nil { + //nolint:gocritic // Need to combine uidbytes with encPos and place them in bptree + valueBytes := append(uidBytes, encPos...) + if err, oldValue := bucket.Put(record.key, valueBytes); err != nil { if errors.Is(err, bbolt.ErrKeyRequired) { return ErrKeyIsEmpty } return err + } else if oldValue != nil { + keyPos := new(KeyPosition) + keyPos.key, keyPos.partition = record.key, record.partition + err = keyPos.uid.UnmarshalBinary(oldValue[:len(keyPos.uid)]) + if err != nil { + return err + } + keyPos.position = wal.DecodeChunkPosition(oldValue[len(keyPos.uid):]) + partitionDeprecatedKeyPosition = append(partitionDeprecatedKeyPosition, keyPos) } } } return nil }) + // send deprecateduuid uuid slice to chan + deprecatedChan <- partitionDeprecatedKeyPosition + return err }) } - return g.Wait() + // Close the channel after all goroutines are done + go func() { + _ = g.Wait() + close(deprecatedChan) + }() + + var deprecatedKeyPosition []*KeyPosition + for partitionDeprecatedKeyPosition := range deprecatedChan { + deprecatedKeyPosition = append(deprecatedKeyPosition, partitionDeprecatedKeyPosition...) + } + + // Wait for all goroutines to finish + if err := g.Wait(); err != nil { + return nil, err + } + + return deprecatedKeyPosition, nil } // DeleteBatch deletes the specified keys from the index. -func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error { +// +//nolint:gocognit +func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) { if len(keys) == 0 { - return nil + return nil, nil } // group keys by partition @@ -150,6 +194,9 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error { partitionKeys[p] = append(partitionKeys[p], key) } + // create chan to collect deprecated entry + deprecatedChan := make(chan []*KeyPosition, len(partitionKeys)) + // delete keys from each partition g, ctx := errgroup.WithContext(context.Background()) for i := range partitionKeys { @@ -159,7 +206,8 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error { } g.Go(func() error { tree := bt.trees[partition] - return tree.Update(func(tx *bbolt.Tx) error { + partitionDeprecatedKeyPosition := make([]*KeyPosition, 0) + err := tree.Update(func(tx *bbolt.Tx) error { // get the bolt db instance for this partition bucket := tx.Bucket(indexBucketName) // delete each key from the bucket @@ -171,16 +219,44 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error { if len(key) == 0 { return ErrKeyIsEmpty } - if err := bucket.Delete(key); err != nil { + if err, oldValue := bucket.Delete(key); err != nil { return err + } else if oldValue != nil { + keyPos := new(KeyPosition) + keyPos.key, keyPos.partition = key, uint32(partition) + err = keyPos.uid.UnmarshalBinary(oldValue[:len(keyPos.uid)]) + if err != nil { + return err + } + keyPos.position = wal.DecodeChunkPosition(oldValue[len(keyPos.uid):]) + partitionDeprecatedKeyPosition = append(partitionDeprecatedKeyPosition, keyPos) } } } return nil }) + // send deprecateduuid uuid slice to chan + deprecatedChan <- partitionDeprecatedKeyPosition + return err }) } - return g.Wait() + // Close the channel after all goroutines are done + go func() { + _ = g.Wait() + close(deprecatedChan) + }() + + var deprecatedKeyPosition []*KeyPosition + for partitionDeprecatedKeyPosition := range deprecatedChan { + deprecatedKeyPosition = append(deprecatedKeyPosition, partitionDeprecatedKeyPosition...) + } + + // Wait for all goroutines to finish + if err := g.Wait(); err != nil { + return nil, err + } + + return deprecatedKeyPosition, nil } // Close releases all boltdb database resources. @@ -291,7 +367,8 @@ func (bi *bptreeIterator) Key() []byte { // Value get the current value. func (bi *bptreeIterator) Value() any { - return bi.value + var uid uuid.UUID + return bi.value[len(uid):] } // Valid returns whether the iterator is exhausted. diff --git a/bptree_test.go b/bptree_test.go index 2480293..1b1400c 100644 --- a/bptree_test.go +++ b/bptree_test.go @@ -2,12 +2,14 @@ package lotusdb import ( "bytes" + "log" "os" "path/filepath" "strconv" "testing" "github.com/cespare/xxhash/v2" + "github.com/google/uuid" "github.com/rosedblabs/wal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -108,7 +110,7 @@ func testbptreeGet(t *testing.T, partitionNum int) { partition: uint32(bt.options.getKeyPartition([]byte("exist"))), position: &wal.ChunkPosition{}, }) - err = bt.PutBatch(keyPositions) + _, err = bt.PutBatch(keyPositions) require.NoError(t, err) tests := []struct { @@ -164,14 +166,17 @@ func testbptreePutbatch(t *testing.T, partitionNum int) { keyPositions = append(keyPositions, &KeyPosition{ key: nil, partition: 0, + uid: uuid.New(), position: &wal.ChunkPosition{}, }, &KeyPosition{ key: []byte("normal"), partition: uint32(bt.options.getKeyPartition([]byte("normal"))), + uid: uuid.New(), position: &wal.ChunkPosition{}, }, &KeyPosition{ key: []byte(""), partition: uint32(bt.options.getKeyPartition([]byte(""))), + uid: uuid.New(), position: &wal.ChunkPosition{}, }, ) @@ -186,15 +191,102 @@ func testbptreePutbatch(t *testing.T, partitionNum int) { {"normal", keyPositions[1:2], false}, {"len(key)=0", keyPositions[2:3], true}, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err = bt.PutBatch(tt.positions); (err != nil) != tt.wantErr { + if _, err = bt.PutBatch(tt.positions); (err != nil) != tt.wantErr { t.Errorf("BPTree.PutBatch() error = %v, wantErr %v", err, tt.wantErr) } }) } } +func TestBPtreePutbatchOldUUID(t *testing.T) { + partitionNum := 3 + options := indexOptions{ + indexType: BTree, + dirPath: filepath.Join(os.TempDir(), "bptree-putBatch-olduuid"+strconv.Itoa(partitionNum)), + partitionNum: partitionNum, + keyHashFunction: xxhash.Sum64, + } + + err := os.MkdirAll(options.dirPath, os.ModePerm) + require.NoError(t, err) + defer func() { + _ = os.RemoveAll(options.dirPath) + }() + + bt, err := openBTreeIndex(options) + require.NoError(t, err) + + var keyPositions []*KeyPosition + keyPositions = append(keyPositions, &KeyPosition{ + key: []byte("123"), + partition: uint32(bt.options.getKeyPartition([]byte("123"))), + uid: uuid.New(), + position: &wal.ChunkPosition{}, + }, &KeyPosition{ + key: []byte("456"), + partition: uint32(bt.options.getKeyPartition([]byte("456"))), + uid: uuid.New(), + position: &wal.ChunkPosition{}, + }, &KeyPosition{ + key: []byte("789"), + partition: uint32(bt.options.getKeyPartition([]byte("789"))), + uid: uuid.New(), + position: &wal.ChunkPosition{}, + }, + ) + + var coverKeyPositions []*KeyPosition + coverKeyPositions = append(coverKeyPositions, &KeyPosition{ + key: []byte("123"), + partition: uint32(bt.options.getKeyPartition([]byte("123"))), + uid: uuid.New(), + position: &wal.ChunkPosition{}, + }, &KeyPosition{ + key: []byte("456"), + partition: uint32(bt.options.getKeyPartition([]byte("456"))), + uid: uuid.New(), + position: &wal.ChunkPosition{}, + }, &KeyPosition{ + key: []byte("789"), + partition: uint32(bt.options.getKeyPartition([]byte("789"))), + uid: uuid.New(), + position: &wal.ChunkPosition{}, + }, + ) + + t.Run("check old uuid", func(t *testing.T) { + _, err = bt.PutBatch(keyPositions) + if err != nil { + t.Errorf("put error = %v", err) + } + var oldKeyPostions []*KeyPosition + oldKeyPostions, err = bt.PutBatch(coverKeyPositions) + if err != nil { + t.Errorf("put error = %v", err) + } + uidMap := make(map[uuid.UUID]struct{}) + for _, oldKeyPostion := range oldKeyPostions { + uidMap[oldKeyPostion.uid] = struct{}{} + } + for _, position := range keyPositions { + log.Println("keyPositions", position.uid) + } + for _, position := range coverKeyPositions { + log.Println("coverkeyPositions", position.uid) + } + + for _, position := range keyPositions { + if _, exists := uidMap[position.uid]; !exists { + log.Println("now:", position.uid) + t.Errorf("uuid not exist!") + } + } + }) +} + func TestBPTree_DeleteBatch_1(t *testing.T) { testbptreeDeletebatch(t, 1) } @@ -228,7 +320,7 @@ func testbptreeDeletebatch(t *testing.T, partitionNum int) { position: &wal.ChunkPosition{}, }) - err = bt.PutBatch(keyPositions) + _, err = bt.PutBatch(keyPositions) require.NoError(t, err) tests := []struct { @@ -243,7 +335,7 @@ func testbptreeDeletebatch(t *testing.T, partitionNum int) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err = bt.DeleteBatch(tt.keys); (err != nil) != tt.wantErr { + if _, err = bt.DeleteBatch(tt.keys); (err != nil) != tt.wantErr { t.Errorf("BPTree.DeleteBatch() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -373,7 +465,7 @@ func Test_bptreeIterator(t *testing.T) { position: &wal.ChunkPosition{SegmentId: 4, BlockNumber: 4, ChunkOffset: 4, ChunkSize: 4}, }) - err = bt.PutBatch(keyPositions) + _, err = bt.PutBatch(keyPositions) require.NoError(t, err) tree := bt.trees[0] @@ -454,7 +546,7 @@ func Test_bptreeIterator(t *testing.T) { require.NoError(t, err) // prefix - err = bt.PutBatch(keyPositions2) + _, err = bt.PutBatch(keyPositions2) require.NoError(t, err) tx, err = tree.Begin(true) diff --git a/db.go b/db.go index b312b3b..8a5ef78 100644 --- a/db.go +++ b/db.go @@ -2,6 +2,7 @@ package lotusdb import ( "context" + "encoding/binary" "errors" "fmt" "io" @@ -11,18 +12,21 @@ import ( "path/filepath" "reflect" "sync" + "sync/atomic" "syscall" "time" "github.com/dgraph-io/badger/v4/y" "github.com/gofrs/flock" + "github.com/google/uuid" "github.com/rosedblabs/diskhash" "github.com/rosedblabs/wal" "golang.org/x/sync/errgroup" ) const ( - fileLockName = "FLOCK" + fileLockName = "FLOCK" + deprecatedMetaName = "DEPMETA" ) // DB is the main structure of the LotusDB database. @@ -36,18 +40,21 @@ const ( // It combines the advantages of LSM tree and B+ tree, read and write are both very fast. // It is also very memory efficient, and can store billions of key-value pairs in a single machine. type DB struct { - activeMem *memtable // Active memtable for writing. - immuMems []*memtable // Immutable memtables, waiting to be flushed to disk. - index Index // index is multi-partition indexes to store key and chunk position. - vlog *valueLog // vlog is the value log. - fileLock *flock.Flock // fileLock to prevent multiple processes from using the same database directory. - flushChan chan *memtable // flushChan is used to notify the flush goroutine to flush memtable to disk. - flushLock sync.Mutex // flushLock is to prevent flush running while compaction doesn't occur - mu sync.RWMutex - closed bool - closeChan chan struct{} - options Options - batchPool sync.Pool // batchPool is a pool of batch, to reduce the cost of memory allocation. + activeMem *memtable // Active memtable for writing. + immuMems []*memtable // Immutable memtables, waiting to be flushed to disk. + index Index // index is multi-partition indexes to store key and chunk position. + vlog *valueLog // vlog is the value log. + fileLock *flock.Flock // fileLock to prevent multiple processes from using the same database directory. + flushChan chan *memtable // flushChan is used to notify the flush goroutine to flush memtable to disk. + flushLock sync.Mutex // flushLock is to prevent flush running while compaction doesn't occur. + compactChan chan deprecatedState // compactChan is used to notify the shard need to compact. + diskIO *DiskIO // monitoring the IO status of disks and allowing autoCompact when appropriate. + mu sync.RWMutex + closed bool + closeflushChan chan struct{} // used to elegantly close flush listening coroutines. + closeCompactChan chan struct{} // used to elegantly close autoCompact listening coroutines. + options Options + batchPool sync.Pool // batchPool is a pool of batch, to reduce the cost of memory allocation. } // Open a database with the specified options. @@ -81,6 +88,13 @@ func Open(options Options) (*DB, error) { return nil, ErrDatabaseIsUsing } + // create deprecatedMeta file if not exist, read deprecatedNumber + deprecatedMetaPath := filepath.Join(options.DirPath, deprecatedMetaName) + deprecatedNumber, totalEntryNumber, err := loadDeprecatedEntryMeta(deprecatedMetaPath) + if err != nil { + return nil, err + } + // open all memtables memtables, err := openAllMemtables(options) if err != nil { @@ -100,27 +114,39 @@ func Open(options Options) (*DB, error) { // open value log vlog, err := openValueLog(valueLogOptions{ - dirPath: options.DirPath, - segmentSize: options.ValueLogFileSize, - blockCache: options.BlockCache, - partitionNum: uint32(options.PartitionNum), - hashKeyFunction: options.KeyHashFunction, - compactBatchCount: options.CompactBatchCount, + dirPath: options.DirPath, + segmentSize: options.ValueLogFileSize, + partitionNum: uint32(options.PartitionNum), + hashKeyFunction: options.KeyHashFunction, + compactBatchCapacity: options.CompactBatchCapacity, + deprecatedtableNumber: deprecatedNumber, + totalNumber: totalEntryNumber, }) if err != nil { return nil, err } + // init diskIO + diskIO := new(DiskIO) + diskIO.targetPath = options.DirPath + diskIO.samplingInterval = options.DiskIOSamplingInterval + diskIO.windowSize = options.DiskIOSamplingWindow + diskIO.busyRate = options.DiskIOBusyRate + diskIO.Init() + db := &DB{ - activeMem: memtables[len(memtables)-1], - immuMems: memtables[:len(memtables)-1], - index: index, - vlog: vlog, - fileLock: fileLock, - flushChan: make(chan *memtable, options.MemtableNums-1), - closeChan: make(chan struct{}), - options: options, - batchPool: sync.Pool{New: makeBatch}, + activeMem: memtables[len(memtables)-1], + immuMems: memtables[:len(memtables)-1], + index: index, + vlog: vlog, + fileLock: fileLock, + flushChan: make(chan *memtable, options.MemtableNums-1), + closeflushChan: make(chan struct{}), + closeCompactChan: make(chan struct{}), + compactChan: make(chan deprecatedState), + diskIO: diskIO, + options: options, + batchPool: sync.Pool{New: makeBatch}, } // if there are some immutable memtables when opening the database, flush them to disk @@ -134,6 +160,16 @@ func Open(options Options) (*DB, error) { // memtables with new coming writes will be flushed to disk if the active memtable is full. go db.listenMemtableFlush() + if options.AutoCompactSupport { + // start autoCompact goroutine asynchronously, + // listen deprecatedtable state, and compact automatically. + go db.listenAutoCompact() + + // start disk IO monitoring, + // blocking low threshold compact operations when busy. + go db.listenDiskIOState() + } + return db, nil } @@ -142,9 +178,14 @@ func Open(options Options) (*DB, error) { // The DB instance cannot be used after closing. func (db *DB) Close() error { close(db.flushChan) - <-db.closeChan + <-db.closeflushChan + if db.options.AutoCompactSupport { + close(db.compactChan) + <-db.closeCompactChan + } db.mu.Lock() defer db.mu.Unlock() + // close all memtables for _, table := range db.immuMems { if err := table.close(); err != nil { @@ -158,12 +199,22 @@ func (db *DB) Close() error { if err := db.index.Close(); err != nil { return err } + + db.flushLock.Lock() + // persist deprecated number and total entry number + deprecatedMetaPath := filepath.Join(db.options.DirPath, deprecatedMetaName) + err := storeDeprecatedEntryMeta(deprecatedMetaPath, db.vlog.deprecatedNumber, db.vlog.totalNumber) + if err != nil { + return err + } + defer db.flushLock.Unlock() + // close value log - if err := db.vlog.close(); err != nil { + if err = db.vlog.close(); err != nil { return err } // release file lock - if err := db.fileLock.Unlock(); err != nil { + if err = db.fileLock.Unlock(); err != nil { return err } @@ -360,27 +411,32 @@ func (db *DB) waitMemtableSpace() error { // Following steps will be done: // 1. Iterate all records in memtable, divide them into deleted keys and log records. // 2. Write the log records to value log, get the positions of keys. -// 3. Write all keys and positions to index. -// 4. Delete the deleted keys from index. +// 3. Add old uuid, write all keys and positions to index. +// 4. Add deleted uuid, and delete the deleted keys from index. // 5. Delete the wal. +// +//nolint:funlen func (db *DB) flushMemtable(table *memtable) { db.flushLock.Lock() defer db.flushLock.Unlock() + sklIter := table.skl.NewIterator() var deletedKeys [][]byte var logRecords []*ValueLogRecord // iterate all records in memtable, divide them into deleted keys and log records + // for every log record, we generate uuid. for sklIter.SeekToFirst(); sklIter.Valid(); sklIter.Next() { key, valueStruct := y.ParseKey(sklIter.Key()), sklIter.Value() if valueStruct.Meta == LogRecordDeleted { deletedKeys = append(deletedKeys, key) } else { - logRecord := ValueLogRecord{key: key, value: valueStruct.Value} + logRecord := ValueLogRecord{key: key, value: valueStruct.Value, uid: uuid.New()} logRecords = append(logRecords, &logRecord) } } _ = sklIter.Close() + // log.Println("len del:",len(deletedKeys),len(logRecords)) // write to value log, get the positions of keys keyPos, err := db.vlog.writeBatch(logRecords) @@ -395,7 +451,7 @@ func (db *DB) flushMemtable(table *memtable) { return } - // write all keys and positions to index + // Add old key uuid into deprecatedtable, write all keys and positions to index. var putMatchKeys []diskhash.MatchKeyFunc if db.options.IndexType == Hash && len(keyPos) > 0 { putMatchKeys = make([]diskhash.MatchKeyFunc, len(keyPos)) @@ -403,11 +459,20 @@ func (db *DB) flushMemtable(table *memtable) { putMatchKeys[i] = MatchKeyFunc(db, keyPos[i].key, nil, nil) } } - if err = db.index.PutBatch(keyPos, putMatchKeys...); err != nil { + + // Write all keys and positions to index. + oldKeyPostions, err := db.index.PutBatch(keyPos, putMatchKeys...) + if err != nil { log.Println("index PutBatch failed:", err) return } - // delete the deleted keys from index + + // Add old key uuid into deprecatedtable + for _, oldKeyPostion := range oldKeyPostions { + db.vlog.setDeprecated(oldKeyPostion.partition, oldKeyPostion.uid) + } + + // Add deleted key uuid into deprecatedtable, and delete the deleted keys from index. var deleteMatchKeys []diskhash.MatchKeyFunc if db.options.IndexType == Hash && len(deletedKeys) > 0 { deleteMatchKeys = make([]diskhash.MatchKeyFunc, len(deletedKeys)) @@ -415,10 +480,18 @@ func (db *DB) flushMemtable(table *memtable) { deleteMatchKeys[i] = MatchKeyFunc(db, deletedKeys[i], nil, nil) } } - if err = db.index.DeleteBatch(deletedKeys, deleteMatchKeys...); err != nil { + + // delete the deleted keys from index + if oldKeyPostions, err = db.index.DeleteBatch(deletedKeys, deleteMatchKeys...); err != nil { log.Println("index DeleteBatch failed:", err) return } + + // uuid into deprecatedtable + for _, oldKeyPostion := range oldKeyPostions { + db.vlog.setDeprecated(oldKeyPostion.partition, oldKeyPostion.uid) + } + // sync the index if err = db.index.Sync(); err != nil { log.Println("index sync failed:", err) @@ -433,6 +506,7 @@ func (db *DB) flushMemtable(table *memtable) { // delete old memtable kept in memory db.mu.Lock() + defer db.mu.Unlock() if table == db.activeMem { options := db.activeMem.options options.tableID++ @@ -449,8 +523,31 @@ func (db *DB) flushMemtable(table *memtable) { db.immuMems = db.immuMems[1:] } } + db.sendThresholdState() +} - db.mu.Unlock() +func (db *DB) sendThresholdState() { + if db.options.AutoCompactSupport { + // check deprecatedtable size + lowerThreshold := uint32((float32)(db.vlog.totalNumber) * db.options.AdvisedCompactionRate) + upperThreshold := uint32((float32)(db.vlog.totalNumber) * db.options.ForceCompactionRate) + thresholdState := deprecatedState{ + thresholdState: ThresholdState(UnarriveThreshold), + } + if db.vlog.deprecatedNumber >= upperThreshold { + thresholdState = deprecatedState{ + thresholdState: ThresholdState(ArriveForceThreshold), + } + } else if db.vlog.deprecatedNumber > lowerThreshold { + thresholdState = deprecatedState{ + thresholdState: ThresholdState(ArriveAdvisedThreshold), + } + } + select { + case db.compactChan <- thresholdState: + default: // this compacting, just do nothing. + } + } } func (db *DB) listenMemtableFlush() { @@ -458,11 +555,12 @@ func (db *DB) listenMemtableFlush() { signal.Notify(sig, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) for { select { + // timer case table, ok := <-db.flushChan: if ok { db.flushMemtable(table) } else { - db.closeChan <- struct{}{} + db.closeflushChan <- struct{}{} return } case <-sig: @@ -471,6 +569,84 @@ func (db *DB) listenMemtableFlush() { } } +// listenAutoComapct is an automated, more fine-grained approach that does not block Bptree. +// it dynamically detects the redundancy of each shard and decides +// determine whether to do compact based on the current IO state. +// +//nolint:gocognit +func (db *DB) listenAutoCompact() { + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + firstCompact := true + thresholdstate := ThresholdState(UnarriveThreshold) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case state, ok := <-db.compactChan: + if ok { + thresholdstate = state.thresholdState + } else { + db.closeCompactChan <- struct{}{} + return + } + case <-sig: + return + case <-ticker.C: + //nolint:nestif // It requires multiple nested conditions for different thresholds and error judgments. + if thresholdstate == ThresholdState(ArriveForceThreshold) { + var err error + if firstCompact { + firstCompact = false + err = db.Compact() + } else { + err = db.CompactWithDeprecatedtable() + } + if err != nil { + panic(err) + } + thresholdstate = ThresholdState(UnarriveThreshold) + } else if thresholdstate == ThresholdState(ArriveAdvisedThreshold) { + // determine whether to do compact based on the current IO state + free, err := db.diskIO.IsFree() + if err != nil { + panic(err) + } + if free { + if firstCompact { + firstCompact = false + err = db.Compact() + } else { + err = db.CompactWithDeprecatedtable() + } + if err != nil { + panic(err) + } + thresholdstate = ThresholdState(UnarriveThreshold) + } else { + log.Println("IO Busy now") + } + } + } + } +} + +func (db *DB) listenDiskIOState() { + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + for { + select { + case <-sig: + return + default: + err := db.diskIO.Monitor() + if err != nil { + panic(err) + } + } + } +} + // Compact will iterate all values in vlog, and write the valid values to a new vlog file. // Then replace the old vlog file with the new one, and delete the old one. // @@ -479,12 +655,12 @@ func (db *DB) Compact() error { db.flushLock.Lock() defer db.flushLock.Unlock() + log.Println("[Compact data]") openVlogFile := func(part int, ext string) *wal.WAL { walFile, err := wal.Open(wal.Options{ DirPath: db.vlog.options.dirPath, SegmentSize: db.vlog.options.segmentSize, SegmentFileExt: fmt.Sprintf(ext, part), - BlockCache: db.vlog.options.blockCache, Sync: false, // we will sync manually BytesPerSync: 0, // the same as Sync }) @@ -496,18 +672,19 @@ func (db *DB) Compact() error { } g, _ := errgroup.WithContext(context.Background()) + var capacity int64 + var capacityList = make([]int64, db.options.PartitionNum) for i := 0; i < int(db.vlog.options.partitionNum); i++ { part := i g.Go(func() error { newVlogFile := openVlogFile(part, tempValueLogFileExt) - - validRecords := make([]*ValueLogRecord, 0, db.vlog.options.compactBatchCount) + validRecords := make([]*ValueLogRecord, 0) reader := db.vlog.walFiles[part].NewReader() - count := 0 // iterate all records in wal, find the valid records for { - count++ chunk, pos, err := reader.Next() + atomic.AddInt64(&capacity, int64(len(chunk))) + capacityList[part] += int64(len(chunk)) if err != nil { if errors.Is(err, io.EOF) { break @@ -539,13 +716,15 @@ func (db *DB) Compact() error { validRecords = append(validRecords, record) } - if count%db.vlog.options.compactBatchCount == 0 { + if capacity >= int64(db.vlog.options.compactBatchCapacity) { err = db.rewriteValidRecords(newVlogFile, validRecords, part) if err != nil { _ = newVlogFile.Delete() return err } validRecords = validRecords[:0] + atomic.AddInt64(&capacity, -capacityList[part]) + capacityList[part] = 0 } } @@ -565,13 +744,126 @@ func (db *DB) Compact() error { } db.vlog.walFiles[part] = openVlogFile(part, valueLogFileExt) + // clean dpTable after compact + db.vlog.dpTables[part].clean() + return nil }) } - + db.vlog.cleanDeprecatedTable() return g.Wait() } +// Compact will iterate all values in vlog, find old values by deprecatedtable, +// and write the valid values to a new vlog file. +// Then replace the old vlog file with the new one, and delete the old one. +// +//nolint:gocognit +func (db *DB) CompactWithDeprecatedtable() error { + db.flushLock.Lock() + defer db.flushLock.Unlock() + + log.Println("[CompactWithDeprecatedtable data]") + openVlogFile := func(part int, ext string) *wal.WAL { + walFile, err := wal.Open(wal.Options{ + DirPath: db.vlog.options.dirPath, + SegmentSize: db.vlog.options.segmentSize, + SegmentFileExt: fmt.Sprintf(ext, part), + Sync: false, // we will sync manually + BytesPerSync: 0, // the same as Sync + }) + if err != nil { + _ = walFile.Delete() + panic(err) + } + return walFile + } + + g, _ := errgroup.WithContext(context.Background()) + var capacity int64 + var capacityList = make([]int64, db.options.PartitionNum) + for i := 0; i < int(db.vlog.options.partitionNum); i++ { + part := i + g.Go(func() error { + newVlogFile := openVlogFile(part, tempValueLogFileExt) + validRecords := make([]*ValueLogRecord, 0) + reader := db.vlog.walFiles[part].NewReader() + // iterate all records in wal, find the valid records + for { + chunk, pos, err := reader.Next() + atomic.AddInt64(&capacity, int64(len(chunk))) + capacityList[part] += int64(len(chunk)) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + _ = newVlogFile.Delete() + return err + } + + record := decodeValueLogRecord(chunk) + if !db.vlog.isDeprecated(part, record.uid) { + // not find old uuid in dptable, we add it to validRecords. + validRecords = append(validRecords, record) + } + if db.options.IndexType == Hash { + var hashTableKeyPos *KeyPosition + // var matchKey func(diskhash.Slot) (bool, error) + matchKey := MatchKeyFunc(db, record.key, &hashTableKeyPos, nil) + var keyPos *KeyPosition + keyPos, err = db.index.Get(record.key, matchKey) + if err != nil { + _ = newVlogFile.Delete() + return err + } + + if db.options.IndexType == Hash { + keyPos = hashTableKeyPos + } + + if keyPos == nil { + continue + } + if keyPos.partition == uint32(part) && reflect.DeepEqual(keyPos.position, pos) { + validRecords = append(validRecords, record) + } + } + + if capacity >= int64(db.vlog.options.compactBatchCapacity) { + err = db.rewriteValidRecords(newVlogFile, validRecords, part) + if err != nil { + _ = newVlogFile.Delete() + return err + } + validRecords = validRecords[:0] + atomic.AddInt64(&capacity, -capacityList[part]) + capacityList[part] = 0 + } + } + if len(validRecords) > 0 { + err := db.rewriteValidRecords(newVlogFile, validRecords, part) + if err != nil { + _ = newVlogFile.Delete() + return err + } + } + + // replace the wal with the new one. + _ = db.vlog.walFiles[part].Delete() + _ = newVlogFile.Close() + if err := newVlogFile.RenameFileExt(fmt.Sprintf(valueLogFileExt, part)); err != nil { + return err + } + db.vlog.walFiles[part] = openVlogFile(part, valueLogFileExt) + return nil + }) + } + + err := g.Wait() + db.vlog.cleanDeprecatedTable() + return err +} + func (db *DB) rewriteValidRecords(walFile *wal.WAL, validRecords []*ValueLogRecord, part int) error { for _, record := range validRecords { walFile.PendingWrites(encodeValueLogRecord(record)) @@ -596,5 +888,82 @@ func (db *DB) rewriteValidRecords(walFile *wal.WAL, validRecords []*ValueLogReco matchKeys[i] = MatchKeyFunc(db, positions[i].key, nil, nil) } } - return db.index.PutBatch(positions, matchKeys...) + _, err = db.index.PutBatch(positions, matchKeys...) + return err +} + +// load deprecated entries meta, and create meta file in first open. +// +// //nolint:nestif //default. +func loadDeprecatedEntryMeta(deprecatedMetaPath string) (uint32, uint32, error) { + var err error + var deprecatedNumber uint32 + var totalEntryNumber uint32 + if _, err = os.Stat(deprecatedMetaPath); os.IsNotExist(err) { + // no exist, create one + var file *os.File + file, err = os.Create(deprecatedMetaPath) + if err != nil { + return deprecatedNumber, totalEntryNumber, err + } + deprecatedNumber = 0 + totalEntryNumber = 0 + file.Close() + } else if err != nil { + return deprecatedNumber, totalEntryNumber, err + } else { + // not err, we load meta + var file *os.File + file, err = os.Open(deprecatedMetaPath) + if err != nil { + return deprecatedNumber, totalEntryNumber, err + } + + // set the file pointer to 0 + _, err = file.Seek(0, 0) + if err != nil { + return deprecatedNumber, totalEntryNumber, err + } + + // read deprecatedNumber + err = binary.Read(file, binary.LittleEndian, &deprecatedNumber) + if err != nil { + return deprecatedNumber, totalEntryNumber, err + } + + // read totalEntryNumber + err = binary.Read(file, binary.LittleEndian, &totalEntryNumber) + if err != nil { + return deprecatedNumber, totalEntryNumber, err + } + } + return deprecatedNumber, totalEntryNumber, nil +} + +// persist deprecated number and total entry number. +func storeDeprecatedEntryMeta(deprecatedMetaPath string, deprecatedNumber uint32, totalNumber uint32) error { + file, err := os.OpenFile(deprecatedMetaPath, os.O_RDWR|os.O_TRUNC, 0666) + if err != nil { + return err + } + + // set the file pointer to 0 and overwrite + _, err = file.Seek(0, 0) + if err != nil { + return err + } + + // write deprecatedNumber + err = binary.Write(file, binary.LittleEndian, &deprecatedNumber) + if err != nil { + return err + } + + // write totalEntryNumber + err = binary.Write(file, binary.LittleEndian, &totalNumber) + if err != nil { + return err + } + file.Close() + return nil } diff --git a/db_test.go b/db_test.go index 56527d6..5c479f8 100644 --- a/db_test.go +++ b/db_test.go @@ -2,6 +2,7 @@ package lotusdb import ( "bytes" + "log" "os" "sync" "testing" @@ -380,6 +381,16 @@ func TestDBFlushMemTables(t *testing.T) { DisableWal: false, }) } + + delLogs := []*testLog{ + {key: []byte("key 3"), value: []byte("value 3")}, + } + for _, log := range delLogs { + _ = db.PutWithOptions(log.key, log.value, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } for i := 0; i < numLogs; i++ { // the size of a logRecord is about 1MB (a little bigger than 1MB due to encode) log := &testLog{key: util.RandomValue(2 << 18), value: util.RandomValue(2 << 18)} @@ -397,15 +408,35 @@ func TestDBFlushMemTables(t *testing.T) { require.NoError(t, err) assert.Equal(t, log.value, value) } + + for _, log := range delLogs { + partition := db.vlog.getKeyPartition(log.key) + record, _ := getRecordFromVlog(db, log.key) + _ = db.DeleteWithOptions(log.key, WriteOptions{ + Sync: true, + DisableWal: false, + }) + for i := 0; i < numLogs; i++ { + // the size of a logRecord is about 1MB (a little bigger than 1MB due to encode) + tlog := &testLog{key: util.RandomValue(2 << 18), value: util.RandomValue(2 << 18)} + _ = db.PutWithOptions(tlog.key, tlog.value, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } + time.Sleep(1 * time.Second) + assert.True(t, true, db.vlog.dpTables[partition].existEntry(record.uid)) + } }) } func TestDBCompact(t *testing.T) { options := DefaultOptions + options.AutoCompactSupport = false path, err := os.MkdirTemp("", "db-test-compact") require.NoError(t, err) options.DirPath = path - options.CompactBatchCount = 2 << 5 + options.CompactBatchCapacity = 1 << 6 db, err := Open(options) require.NoError(t, err) @@ -416,31 +447,81 @@ func TestDBCompact(t *testing.T) { {key: []byte("key 1"), value: []byte("value 1")}, {key: []byte("key 2"), value: []byte("value 2")}, } + for _, log := range testlogs { _ = db.PutWithOptions(log.key, log.value, WriteOptions{ Sync: true, DisableWal: false, }) } - // write logs and flush - logs := produceAndWriteLogs(500, db) - // delete logs - for _, log := range logs { - _ = db.DeleteWithOptions(log.key, WriteOptions{ + + produceAndWriteLogs(100000, 0, db) + // overwrite half. background busy flushing. + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + + t.Run("test compaction", func(t *testing.T) { + var size, sizeCompact int64 + + size, err = util.DirSize(db.options.DirPath) + require.NoError(t, err) + + err = db.Compact() + require.NoError(t, err) + + sizeCompact, err = util.DirSize(db.options.DirPath) + require.NoError(t, err) + require.Greater(t, size, sizeCompact) + var value []byte + for _, log := range testlogs { + value, err = getValueFromVlog(db, log.key) + require.NoError(t, err) + assert.Equal(t, log.value, value) + } + }) +} + +func TestDBCompactWitchDeprecatetable(t *testing.T) { + options := DefaultOptions + options.AutoCompactSupport = false + path, err := os.MkdirTemp("", "db-test-CompactWitchDeprecatetable") + require.NoError(t, err) + options.DirPath = path + options.CompactBatchCapacity = 1 << 6 + + db, err := Open(options) + require.NoError(t, err) + defer destroyDB(db) + + testlogs := []*testLog{ + {key: []byte("key 0"), value: []byte("value 0")}, + {key: []byte("key 1"), value: []byte("value 1")}, + {key: []byte("key 2"), value: []byte("value 2")}, + } + + for _, log := range testlogs { + _ = db.PutWithOptions(log.key, log.value, WriteOptions{ Sync: true, DisableWal: false, }) } - // make sure deleted logs will be flush - produceAndWriteLogs(100, db) + + produceAndWriteLogs(100000, 0, db) + // overwrite half. background busy flushing. + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) t.Run("test compaction", func(t *testing.T) { var size, sizeCompact int64 - time.Sleep(time.Millisecond * 5000) + size, err = util.DirSize(db.options.DirPath) require.NoError(t, err) - err = db.Compact() + err = db.CompactWithDeprecatedtable() require.NoError(t, err) sizeCompact, err = util.DirSize(db.options.DirPath) @@ -455,6 +536,142 @@ func TestDBCompact(t *testing.T) { }) } +func TestDBAutoCompact(t *testing.T) { + options := DefaultOptions + options.AutoCompactSupport = true + path, err := os.MkdirTemp("", "db-test-AutoCompact") + require.NoError(t, err) + options.DirPath = path + options.CompactBatchCapacity = 1 << 6 + + db, err := Open(options) + require.NoError(t, err) + defer destroyDB(db) + + testlogs := []*testLog{ + {key: []byte("key 0"), value: []byte("value 0")}, + {key: []byte("key 1"), value: []byte("value 1")}, + {key: []byte("key 2"), value: []byte("value 2")}, + } + + testrmlogs := []*testLog{ + {key: []byte("key 0 rm"), value: []byte("value 0")}, + {key: []byte("key 1 rm"), value: []byte("value 1")}, + {key: []byte("key 2 rm"), value: []byte("value 2")}, + } + + t.Run("test compaction", func(t *testing.T) { + for _, log := range testlogs { + _ = db.PutWithOptions(log.key, log.value, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } + for _, log := range testrmlogs { + _ = db.DeleteWithOptions(log.key, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } + // load init key value. + produceAndWriteLogs(100000, 0, db) + // overwrite half. background busy flushing. + for i := 0; i < 6; i++ { + time.Sleep(500 * time.Microsecond) + produceAndWriteLogs(50000, 0, db) + } + + require.NoError(t, err) + + var value []byte + for _, log := range testlogs { + value, err = getValueFromVlog(db, log.key) + require.NoError(t, err) + assert.Equal(t, log.value, value) + } + for _, log := range testrmlogs { + value, err = db.Get(log.key) + require.Error(t, err) + assert.Equal(t, []byte(nil), value) + } + }) +} + +func TestDBAutoCompactWithBusyIO(t *testing.T) { + options := DefaultOptions + options.AutoCompactSupport = true + options.AdvisedCompactionRate = 0.2 + options.ForceCompactionRate = 0.5 + path, err := os.MkdirTemp("", "db-test-AutoCompactWithBusyIO") + require.NoError(t, err) + options.DirPath = path + options.CompactBatchCapacity = 1 << 6 + + db, err := Open(options) + require.NoError(t, err) + defer destroyDB(db) + + testlogs := []*testLog{ + {key: []byte("key 0"), value: []byte("value 0")}, + {key: []byte("key 1"), value: []byte("value 1")}, + {key: []byte("key 2"), value: []byte("value 2")}, + } + + testrmlogs := []*testLog{ + {key: []byte("key 0 rm"), value: []byte("value 0")}, + {key: []byte("key 1 rm"), value: []byte("value 1")}, + {key: []byte("key 2 rm"), value: []byte("value 2")}, + } + + t.Run("test compaction", func(t *testing.T) { + for _, log := range testlogs { + _ = db.PutWithOptions(log.key, log.value, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } + for _, log := range testrmlogs { + _ = db.DeleteWithOptions(log.key, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } + // load init key value. + ioCloseChan := make(chan struct{}) + go func() { + SimpleIO(options.DirPath+"iofile", 10) + ioCloseChan <- struct{}{} + }() + + produceAndWriteLogs(100000, 0, db) + // overwrite half. background busy flushing. + produceAndWriteLogs(50000, 0, db) + go SimpleIO(options.DirPath+"iofile", 10) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + // we sleep 1s, this time not IO busy. So that background will do autoCompact. + time.Sleep(1 * time.Second) + <-ioCloseChan + close(ioCloseChan) + produceAndWriteLogs(50000, 0, db) + produceAndWriteLogs(50000, 0, db) + require.NoError(t, err) + + var value []byte + for _, log := range testlogs { + value, err = getValueFromVlog(db, log.key) + require.NoError(t, err) + assert.Equal(t, log.value, value) + } + for _, log := range testrmlogs { + value, err = db.Get(log.key) + require.Error(t, err) + assert.Equal(t, []byte(nil), value) + } + }) +} + func getValueFromVlog(db *DB, key []byte) ([]byte, error) { var value []byte var matchKey func(diskhash.Slot) (bool, error) @@ -481,11 +698,12 @@ func getValueFromVlog(db *DB, key []byte) ([]byte, error) { return record.value, nil } -func produceAndWriteLogs(numLogs int, db *DB) []*testLog { +func produceAndWriteLogs(numLogs int64, offset int64, db *DB) []*testLog { var logs []*testLog - for i := 0; i < numLogs; i++ { - // the size of a logRecord is about 1MB (a little bigger than 1MB due to encode) - log := &testLog{key: util.RandomValue(1 << 5), value: util.RandomValue(1 << 20)} + var i int64 + for i = 0; i < numLogs; i++ { + // the size of a logRecord is about 1KB (a little bigger than 1KB due to encode) + log := &testLog{key: util.GetTestKey(offset + i), value: util.RandomValue(1 << 10)} logs = append(logs, log) } for _, log := range logs { @@ -497,6 +715,27 @@ func produceAndWriteLogs(numLogs int, db *DB) []*testLog { return logs } +func getRecordFromVlog(db *DB, key []byte) (*ValueLogRecord, error) { + var value []byte + var matchKey func(diskhash.Slot) (bool, error) + if db.options.IndexType == Hash { + matchKey = MatchKeyFunc(db, key, nil, &value) + } + position, err := db.index.Get(key, matchKey) + if err != nil { + return nil, err + } + if position == nil { + return nil, ErrKeyNotFound + } + record, err := db.vlog.read(position) + if err != nil { + return nil, err + } + return record, nil +} + +//nolint:gocognit func TestDBMultiClients(t *testing.T) { type testLog struct { key []byte @@ -529,6 +768,18 @@ func TestDBMultiClients(t *testing.T) { for i := 0; i < 2; i++ { wg.Add(1) go func(i int) { + delLogs := produceAndWriteLogs(50000, int64(i)*50000, db) + // delete logs + for idx, log := range delLogs { + if idx%5 == 0 { + _ = db.DeleteWithOptions(log.key, WriteOptions{ + Sync: true, + DisableWal: false, + }) + } + } + produceAndWriteLogs(50000, int64(i)*50000, db) + time.Sleep(time.Millisecond * 500) for _, log := range logs[i] { _ = db.PutWithOptions(log.key, log.value, WriteOptions{ Sync: true, @@ -586,6 +837,7 @@ func TestDBMultiClients(t *testing.T) { //nolint:gocognit func TestDBIterator(t *testing.T) { options := DefaultOptions + options.AutoCompactSupport = false path, err := os.MkdirTemp("", "db-test-iter") require.NoError(t, err) options.DirPath = path @@ -599,7 +851,6 @@ func TestDBIterator(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } for i := 0; i < 3; i++ { opts.tableID = uint32(i) @@ -804,3 +1055,58 @@ func TestDBIterator(t *testing.T) { assert.Equal(t, ErrDBIteratorUnsupportedTypeHASH, err) assert.Nil(t, itr) } + +func TestDeprecatetableMetaPersist(t *testing.T) { + options := DefaultOptions + options.AutoCompactSupport = true + path, err := os.MkdirTemp("", "db-test-DeprecatetableMetaPersist") + require.NoError(t, err) + options.DirPath = path + options.CompactBatchCapacity = 1 << 6 + + db, err := Open(options) + require.NoError(t, err) + + t.Run("test same deprecated number", func(t *testing.T) { + produceAndWriteLogs(100000, 0, db) + // overwrite half. background busy flushing. + for i := 0; i < 3; i++ { + time.Sleep(500 * time.Microsecond) + produceAndWriteLogs(50000, 0, db) + } + db.Close() + deprecatedNumberFirst := db.vlog.deprecatedNumber + totalNumberFirst := db.vlog.totalNumber + db, err = Open(options) + deprecatedNumberSecond := db.vlog.deprecatedNumber + totalNumberSecond := db.vlog.totalNumber + require.NoError(t, err) + assert.Equal(t, deprecatedNumberFirst, deprecatedNumberSecond) + assert.Equal(t, totalNumberFirst, totalNumberSecond) + }) +} + +func SimpleIO(targetPath string, count int) { + file, err := os.Create(targetPath) + if err != nil { + log.Println("Error creating file:", err) + return + } + data := util.RandomValue(1 << 24) + for count > 0 { + count-- + _, err = file.Write(data) + if err != nil { + log.Println("Error writing to file:", err) + return + } + + err = file.Sync() + if err != nil { + log.Println("Error syncing file:", err) + return + } + + time.Sleep(1 * time.Millisecond) + } +} diff --git a/deprecatedtable.go b/deprecatedtable.go new file mode 100644 index 0000000..297732a --- /dev/null +++ b/deprecatedtable.go @@ -0,0 +1,55 @@ +package lotusdb + +import ( + "github.com/google/uuid" +) + +type ThresholdState int + +const ( + ArriveAdvisedThreshold int = iota // Recommended to perform a compaction at this time + ArriveForceThreshold // At this point, force a compaction + UnarriveThreshold // Not require compaction +) + +type ( + // Deprecatedtable is used to store old information about deleted/updated keys. + // for every write/update generated an uuid, we store uuid in the table. + // It is useful in compaction, allowing us to know whether the kv + // in the value log is up-to-date without accessing the index. + deprecatedtable struct { + partition int // which shard in vlog + table map[uuid.UUID]struct{} // we store deprecated uuid of keys,in memory + size uint32 // number of deprecated entry now + } + + // used to send message to autoCompact. + deprecatedState struct { + thresholdState ThresholdState + } +) + +// Create a new deprecatedtable. +func newDeprecatedTable(partition int) *deprecatedtable { + return &deprecatedtable{ + partition: partition, + table: make(map[uuid.UUID]struct{}), + size: 0, + } +} + +// Add a uuid to the specified key. +func (dt *deprecatedtable) addEntry(id uuid.UUID) { + dt.table[id] = struct{}{} + dt.size++ +} + +func (dt *deprecatedtable) existEntry(id uuid.UUID) bool { + _, exists := dt.table[id] + return exists +} + +func (dt *deprecatedtable) clean() { + dt.table = make(map[uuid.UUID]struct{}) + dt.size = 0 +} diff --git a/deprecatedtable_test.go b/deprecatedtable_test.go new file mode 100644 index 0000000..856ec43 --- /dev/null +++ b/deprecatedtable_test.go @@ -0,0 +1,42 @@ +package lotusdb + +import ( + "testing" + + "github.com/google/uuid" +) + +func TestAddEntry(t *testing.T) { + dt := newDeprecatedTable(0) + uidNumber := 3 + count := 4 + + for i := 0; i < count; i++ { + for j := 0; j < uidNumber; j++ { + uid := uuid.New() + dt.addEntry(uid) + } + } + if (int)(dt.size) != count*uidNumber { + t.Errorf("expected dt.size to be %d, got %d", count, dt.size) + } +} + +func TestUuidExist(t *testing.T) { + dt := newDeprecatedTable(0) + uidNumber := 3 + count := 4 + + for i := 0; i < count; i++ { + for j := 0; j < uidNumber; j++ { + uid := uuid.New() + dt.addEntry(uid) + if !dt.existEntry(uid) { + t.Errorf("expected entry not exist!") + } + } + } + if (int)(dt.size) != count*uidNumber { + t.Errorf("expected dt.size to be %d, got %d", count, dt.size) + } +} diff --git a/diskio.go b/diskio.go new file mode 100644 index 0000000..8ee5a1a --- /dev/null +++ b/diskio.go @@ -0,0 +1,150 @@ +package lotusdb + +import ( + "errors" + "log" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/shirou/gopsutil/disk" +) + +type DiskIO struct { + targetPath string // db path, we use it to find disk device + samplingInterval int // sampling time, millisecond + samplingWindow []uint64 // sampling window is used to sample the average IoTime over a period of time + windowSize int // size of the sliding window used for sampling. + windowPoint int // next sampling offset in window + busyRate float32 // express io busy status by the proportion of io time in the sampling time + freeFlag bool // freeFlag indicates whether the disk is free + mu sync.Mutex +} + +func (io *DiskIO) Init() { + io.samplingWindow = make([]uint64, io.windowSize) + io.windowPoint = 0 +} + +func (io *DiskIO) Monitor() error { + var ioStart disk.IOCountersStat + var ioEnd disk.IOCountersStat + var err error + + ioStart, err = GetDiskIOInfo((io.targetPath)) + if err != nil { + return err + } + + time.Sleep(time.Duration(io.samplingInterval) * time.Millisecond) + + ioEnd, err = GetDiskIOInfo((io.targetPath)) + if err != nil { + return err + } + + // IoTime is device active time since system boot, we get it during sampling. + ioTime := ioEnd.IoTime - ioStart.IoTime + + // set time and move point to next slot + io.samplingWindow[io.windowPoint] = ioTime + io.windowPoint++ + io.windowPoint %= io.windowSize + + // get mean IoTime + var sum uint64 + for _, value := range io.samplingWindow { + sum += value + } + meanTime := sum / (uint64(io.windowSize)) + + // others may read io.freeFlag by IsFree, so we need lock it when changing. + io.mu.Lock() + defer io.mu.Unlock() + // this log maybe useful + // log.Println("meantime:", meanTime, "BusyThreshold:", uint64(float32(io.samplingInterval)*io.busyRate)) + if meanTime > uint64(float32(io.samplingInterval)*io.busyRate) { + io.freeFlag = false + } else { + io.freeFlag = true + } + return nil +} + +func (io *DiskIO) IsFree() (bool, error) { + // if runtime.GOOS != "linux" { + // return true, nil + // } + if io.busyRate < 0 { + return true, nil + } + io.mu.Lock() + defer io.mu.Unlock() + return io.freeFlag, nil +} + +func GetDiskIOInfo(targetPath string) (disk.IOCountersStat, error) { + var io disk.IOCountersStat + // Get all mounting points + partitions, err := disk.Partitions(false) + if err != nil { + log.Println("Error getting partitions:", err) + return io, err + } + + var targetDevice string + + // Find the mount point where the target path is located + for _, partition := range partitions { + if isPathOnDevice(targetPath, partition.Mountpoint) { + targetDevice = partition.Device + break + } + } + + targetDevice = getDeviceName(targetDevice) + + // Get the I/O status of the device + ioCounters, err := disk.IOCounters() + if err != nil { + return io, err + } + + var exists bool + if io, exists = ioCounters[targetDevice]; !exists { + return io, errors.New("No I/O stats available for device" + targetDevice) + } + + return io, nil +} + +// Check if the path is on the specified mount point. +func getDeviceName(devicePath string) string { + parts := strings.Split(devicePath, "/") + if len(parts) > 0 { + return parts[len(parts)-1] + } + return devicePath +} + +// Check if the path is on the specified mount point. +func isPathOnDevice(path, mountpoint string) bool { + absPath, err := filepath.Abs(path) + if err != nil { + log.Println("Error getting absolute path:", err) + return false + } + + absMountpoint, err := filepath.Abs(mountpoint) + if err != nil { + log.Println("Error getting absolute mountpoint:", err) + return false + } + + // Ensure paths are normalized for comparison + absPath = filepath.Clean(absPath) + absMountpoint = filepath.Clean(absMountpoint) + + return strings.HasPrefix(absPath, absMountpoint) +} diff --git a/go.mod b/go.mod index e54b116..c48ccf4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/lotusdblabs/lotusdb/v2 -go 1.19 +go 1.21 require ( github.com/bwmarrin/snowflake v0.3.0 @@ -8,19 +8,25 @@ require ( github.com/dgraph-io/badger/v4 v4.2.0 github.com/gofrs/flock v0.8.1 github.com/rosedblabs/diskhash v0.0.0-20230910084041-289755737e2a - github.com/rosedblabs/wal v1.3.6 - github.com/stretchr/testify v1.8.4 + github.com/rosedblabs/wal v1.3.8 + github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.8 golang.org/x/sync v0.5.0 ) +require ( + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/shirou/gopsutil v3.21.11+incompatible // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect +) + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.0 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.17.4 // indirect github.com/kr/pretty v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect @@ -32,3 +38,5 @@ require ( golang.org/x/sys v0.15.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace go.etcd.io/bbolt => github.com/yanxiaoqi932/bbolt v1.3.9-0.20240829105042-5b817c5f51f8 diff --git a/go.sum b/go.sum index a41dd7f..13977f4 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUn github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -24,8 +26,9 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= @@ -42,20 +45,24 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rosedblabs/diskhash v0.0.0-20230910084041-289755737e2a h1:BNp46nsknQivr3Gxzc6ytzG7xtBscBnLYZIkr0UfCko= github.com/rosedblabs/diskhash v0.0.0-20230910084041-289755737e2a/go.mod h1:3xvIg+7iOFUL/vMCE/6DwE6Yecb0okVYJBEfpdC/E+8= -github.com/rosedblabs/wal v1.3.6 h1:oxZYTPX/u4JuGDW98wQ1YamWqerlrlSUFKhgP6Gd/Ao= -github.com/rosedblabs/wal v1.3.6/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM= +github.com/rosedblabs/wal v1.3.8 h1:tErpD9JT/ICiyV3mv5l7qUH6lybn5XF1TbI0e8kvH8M= +github.com/rosedblabs/wal v1.3.8/go.mod h1:DFvhrmTTeiXvn2btXXT2MW9Nvu99PU0g/pKGgh0+T+o= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/yanxiaoqi932/bbolt v1.3.9-0.20240829105042-5b817c5f51f8 h1:tJsdXNnp2LPnEZb97MA51dQygG2q01n3kJ72rQ4LL9Y= +github.com/yanxiaoqi932/bbolt v1.3.9-0.20240829105042-5b817c5f51f8/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA= -go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -74,6 +81,7 @@ golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= @@ -90,6 +98,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/hashtable.go b/hashtable.go index 2f7387d..89ff6df 100644 --- a/hashtable.go +++ b/hashtable.go @@ -47,9 +47,9 @@ func openHashIndex(options indexOptions) (*HashTable, error) { } // PutBatch put batch records to index. -func (ht *HashTable) PutBatch(positions []*KeyPosition, matchKeyFunc ...diskhash.MatchKeyFunc) error { +func (ht *HashTable) PutBatch(positions []*KeyPosition, matchKeyFunc ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) { if len(positions) == 0 { - return nil + return nil, nil } partitionRecords := make([][]*KeyPosition, ht.options.partitionNum) matchKeys := make([][]diskhash.MatchKeyFunc, ht.options.partitionNum) @@ -86,7 +86,7 @@ func (ht *HashTable) PutBatch(positions []*KeyPosition, matchKeyFunc ...diskhash return nil }) } - return g.Wait() + return nil, g.Wait() } // Get chunk position by key. @@ -105,9 +105,9 @@ func (ht *HashTable) Get(key []byte, matchKeyFunc ...diskhash.MatchKeyFunc) (*Ke } // DeleteBatch delete batch records from index. -func (ht *HashTable) DeleteBatch(keys [][]byte, matchKeyFunc ...diskhash.MatchKeyFunc) error { +func (ht *HashTable) DeleteBatch(keys [][]byte, matchKeyFunc ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) { if len(keys) == 0 { - return nil + return nil, nil } partitionKeys := make([][][]byte, ht.options.partitionNum) matchKeys := make([][]*diskhash.MatchKeyFunc, ht.options.partitionNum) @@ -141,7 +141,7 @@ func (ht *HashTable) DeleteBatch(keys [][]byte, matchKeyFunc ...diskhash.MatchKe return nil }) } - return g.Wait() + return nil, g.Wait() } // Sync sync index data to disk. diff --git a/hashtable_test.go b/hashtable_test.go index 0ce015a..cc3c720 100644 --- a/hashtable_test.go +++ b/hashtable_test.go @@ -139,7 +139,7 @@ func testHashTablePutBatch(t *testing.T, partitionNum int) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err = ht.PutBatch(tt.positions, tt.matchKeyFunc...); (err != nil) != tt.wantErr { + if _, err = ht.PutBatch(tt.positions, tt.matchKeyFunc...); (err != nil) != tt.wantErr { t.Errorf("HashTable.PutBatch() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -174,7 +174,7 @@ func testHashTableGet(t *testing.T, partitionNum int) { matchKeyFuncs := []diskhash.MatchKeyFunc{ testMatchFunc(true), testMatchFunc(false), } - err = ht.PutBatch(keyPositions, matchKeyFuncs[:1]...) + _, err = ht.PutBatch(keyPositions, matchKeyFuncs[:1]...) require.NoError(t, err) tests := []struct { @@ -229,7 +229,7 @@ func testHashTableDeleteBatch(t *testing.T, partitionNum int) { partition: uint32(ht.options.getKeyPartition([]byte("normal"))), position: &wal.ChunkPosition{}, }) - err = ht.PutBatch(keyPositions, []diskhash.MatchKeyFunc{testMatchFunc(true)}...) + _, err = ht.PutBatch(keyPositions, []diskhash.MatchKeyFunc{testMatchFunc(true)}...) require.NoError(t, err) matchKeyFuncs := []diskhash.MatchKeyFunc{ @@ -248,7 +248,7 @@ func testHashTableDeleteBatch(t *testing.T, partitionNum int) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err = ht.DeleteBatch(tt.keys, tt.matchKeyFunc...); (err != nil) != tt.wantErr { + if _, err = ht.DeleteBatch(tt.keys, tt.matchKeyFunc...); (err != nil) != tt.wantErr { t.Errorf("HashTable.DeleteBatch() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/index.go b/index.go index de9dbce..9a9d5f9 100644 --- a/index.go +++ b/index.go @@ -1,6 +1,8 @@ package lotusdb -import "github.com/rosedblabs/diskhash" +import ( + "github.com/rosedblabs/diskhash" +) const ( // indexFileExt is the file extension for index files. @@ -15,13 +17,13 @@ const ( // But you can implement your own index if you want. type Index interface { // PutBatch put batch records to index - PutBatch(keyPositions []*KeyPosition, matchKeyFunc ...diskhash.MatchKeyFunc) error + PutBatch(keyPositions []*KeyPosition, matchKeyFunc ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) // Get chunk position by key Get(key []byte, matchKeyFunc ...diskhash.MatchKeyFunc) (*KeyPosition, error) // DeleteBatch delete batch records from index - DeleteBatch(keys [][]byte, matchKeyFunc ...diskhash.MatchKeyFunc) error + DeleteBatch(keys [][]byte, matchKeyFunc ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) // Sync sync index data to disk Sync() error diff --git a/memtable.go b/memtable.go index 123438c..cf3d346 100644 --- a/memtable.go +++ b/memtable.go @@ -51,7 +51,6 @@ type ( memSize uint32 // max size of the memtable walBytesPerSync uint32 // flush wal file to disk throughput BytesPerSync parameter walSync bool // WAL flush immediately after each writing - walBlockCache uint32 // block cache size of wal } ) @@ -91,7 +90,6 @@ func openAllMemtables(options Options) ([]*memtable, error) { memSize: options.MemtableSize, walSync: options.Sync, walBytesPerSync: options.BytesPerSync, - walBlockCache: options.BlockCache, }) if errOpenMemtable != nil { return nil, errOpenMemtable @@ -116,7 +114,6 @@ func openMemtable(options memtableOptions) (*memtable, error) { DirPath: options.dirPath, SegmentSize: math.MaxInt, // no limit, guarantee that a wal file only contains one segment file SegmentFileExt: fmt.Sprintf(walFileExt, options.tableID), - BlockCache: options.walBlockCache, Sync: options.walSync, BytesPerSync: options.walBytesPerSync, }) diff --git a/memtable_test.go b/memtable_test.go index 97f7a4e..606935d 100644 --- a/memtable_test.go +++ b/memtable_test.go @@ -27,7 +27,6 @@ func TestMemtableOpen(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } t.Run("open memtable", func(t *testing.T) { @@ -56,7 +55,6 @@ func TestMemtableOpenAll(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err = openMemtable(opts) require.NoError(t, err) @@ -93,7 +91,6 @@ func TestMemTablePutAllKindsEntries(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) @@ -151,7 +148,6 @@ func TestMemTablePutBatch(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) @@ -166,7 +162,7 @@ func TestMemTablePutBatch(t *testing.T) { pendingWrites := make(map[string]*LogRecord) val := util.RandomValue(512) for i := 0; i < 1000; i++ { - log := &LogRecord{Key: util.GetTestKey(i), Value: val} + log := &LogRecord{Key: util.GetTestKey(int64(i)), Value: val} pendingWrites[string(log.Key)] = log } @@ -193,7 +189,6 @@ func TestMemTablePutBatchReopen(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) @@ -208,7 +203,7 @@ func TestMemTablePutBatchReopen(t *testing.T) { pendingWrites := make(map[string]*LogRecord) val := util.RandomValue(512) for i := 0; i < 1000; i++ { - log := &LogRecord{Key: util.GetTestKey(i), Value: val} + log := &LogRecord{Key: util.GetTestKey(int64(i)), Value: val} pendingWrites[string(log.Key)] = log } @@ -242,7 +237,6 @@ func TestMemTableGet(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) @@ -303,7 +297,6 @@ func TestMemTableGetReopen(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } writeOpts := WriteOptions{ @@ -385,7 +378,6 @@ func TestMemTableDelWal(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) @@ -418,7 +410,6 @@ func TestMemTableSync(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) @@ -433,7 +424,7 @@ func TestMemTableSync(t *testing.T) { pendingWrites := make(map[string]*LogRecord) val := util.RandomValue(512) for i := 0; i < 1000; i++ { - log := &LogRecord{Key: util.GetTestKey(i), Value: val} + log := &LogRecord{Key: util.GetTestKey(int64(i)), Value: val} pendingWrites[string(log.Key)] = log } err = table.putBatch(pendingWrites, node.Generate(), writeOpts) @@ -463,7 +454,6 @@ func TestMemtableClose(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) @@ -489,7 +479,6 @@ func TestNewMemtableIterator(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) @@ -524,7 +513,6 @@ func Test_memtableIterator(t *testing.T) { memSize: DefaultOptions.MemtableSize, walBytesPerSync: DefaultOptions.BytesPerSync, walSync: DefaultBatchOptions.Sync, - walBlockCache: DefaultOptions.BlockCache, } table, err := openMemtable(opts) require.NoError(t, err) diff --git a/options.go b/options.go index 64b66e3..c548252 100644 --- a/options.go +++ b/options.go @@ -20,11 +20,6 @@ type Options struct { // Default value is 15. MemtableNums int - // BlockCache specifies the size of the block cache in number of bytes. - // A block cache is used to store recently accessed data blocks, improving read performance. - // If BlockCache is set to 0, no block cache will be used. - BlockCache uint32 - // Sync is whether to synchronize writes through os buffer cache and down onto the actual disk. // Setting sync is required for durability of a single write operation, but also results in slower writes. // @@ -54,8 +49,26 @@ type Options struct { // default value is bptree. IndexType IndexType - // writing entries to disk after reading the specified number of entries. - CompactBatchCount int + // writing entries to disk after reading the specified memory capacity of entries. + CompactBatchCapacity int + + // deprecatedtable recommend compaction rate + AdvisedCompactionRate float32 + + // deprecatedtable force compaction rate + ForceCompactionRate float32 + + // sampling interval of diskIO, unit is millisecond + DiskIOSamplingInterval int + + // window is used to retrieve the state of IO bury over a period of time + DiskIOSamplingWindow int + + // rate of io time in the sampling time is used to represent the busy state of io + DiskIOBusyRate float32 + + // AutoCompactSupport support + AutoCompactSupport bool // WaitMemSpaceTimeout specifies the timeout for waiting for space in the memtable. // When all memtables are full, it will be flushed to disk by the background goroutine. @@ -119,7 +132,6 @@ var DefaultOptions = Options{ MemtableSize: 64 * MB, //nolint:gomnd // default MemtableNums: 15, - BlockCache: 0, Sync: false, BytesPerSync: 0, //nolint:gomnd // default @@ -128,7 +140,18 @@ var DefaultOptions = Options{ ValueLogFileSize: 1 * GB, IndexType: BTree, //nolint:gomnd // default - CompactBatchCount: 10000, + CompactBatchCapacity: 1 << 30, + //nolint:gomnd // default + AdvisedCompactionRate: 0.3, + //nolint:gomnd // default + ForceCompactionRate: 0.5, + //nolint:gomnd // default + DiskIOSamplingInterval: 100, + //nolint:gomnd // default + DiskIOSamplingWindow: 10, + //nolint:gomnd // default + DiskIOBusyRate: 0.5, + AutoCompactSupport: true, //nolint:gomnd // default WaitMemSpaceTimeout: 100 * time.Millisecond, } diff --git a/struct_test.go b/struct_test.go new file mode 100644 index 0000000..26a5e3f --- /dev/null +++ b/struct_test.go @@ -0,0 +1,40 @@ +package lotusdb + +import ( + "bytes" + "testing" + + "github.com/google/uuid" +) + +func TestEncodeDecodeValueLogRecord(t *testing.T) { + // Example data + key := []byte("mykey") + value := []byte("myvalue") + uuidVal := uuid.New() + + record := &ValueLogRecord{ + key: key, + value: value, + uid: uuidVal, + } + + // Encode the record + encoded := encodeValueLogRecord(record) + + // Decode the encoded record + decoded := decodeValueLogRecord(encoded) + + // Compare original and decoded records + if !bytes.Equal(record.key, decoded.key) { + t.Errorf("Expected key %v, got %v", record.key, decoded.key) + } + + if !bytes.Equal(record.value, decoded.value) { + t.Errorf("Expected value %v, got %v", record.value, decoded.value) + } + + if record.uid != decoded.uid { + t.Errorf("Expected UUID %v, got %v", record.uid, decoded.uid) + } +} diff --git a/structs.go b/structs.go index 16068a2..596bfee 100644 --- a/structs.go +++ b/structs.go @@ -3,6 +3,7 @@ package lotusdb import ( "encoding/binary" + "github.com/google/uuid" "github.com/rosedblabs/wal" ) @@ -98,34 +99,56 @@ func decodeLogRecord(buf []byte) *LogRecord { type KeyPosition struct { key []byte partition uint32 + uid uuid.UUID position *wal.ChunkPosition } // ValueLogRecord is the record of the key/value pair in the value log. type ValueLogRecord struct { + uid uuid.UUID key []byte value []byte } func encodeValueLogRecord(record *ValueLogRecord) []byte { - buf := make([]byte, 4+len(record.key)+len(record.value)) keySize := 4 index := 0 - binary.LittleEndian.PutUint32(buf[index:keySize], uint32(len(record.key))) + uidBytes, _ := record.uid.MarshalBinary() + buf := make([]byte, len(uidBytes)+4+len(record.key)+len(record.value)) + + copy(buf[index:], uidBytes) + index += len(uidBytes) + + binary.LittleEndian.PutUint32(buf[index:index+keySize], uint32(len(record.key))) index += keySize copy(buf[index:index+len(record.key)], record.key) index += len(record.key) + copy(buf[index:], record.value) return buf } func decodeValueLogRecord(buf []byte) *ValueLogRecord { - var keySize uint32 = 4 - keyLen := binary.LittleEndian.Uint32(buf[:keySize]) + keySize := 4 + index := 0 + var uid uuid.UUID + uidBytes := buf[:len(uid)] + err := uid.UnmarshalBinary(uidBytes) + if err != nil { + return nil + } + index += len(uid) + + keyLen := (int)(binary.LittleEndian.Uint32(buf[index : index+keySize])) + index += keySize + key := make([]byte, keyLen) - copy(key, buf[keySize:keySize+keyLen]) - value := make([]byte, uint32(len(buf))-keyLen-keySize) - copy(value, buf[keySize+keyLen:]) - return &ValueLogRecord{key: key, value: value} + copy(key, buf[index:index+keyLen]) + index += keyLen + + value := make([]byte, len(buf)-len(uid)-keyLen-keySize) + copy(value, buf[index:]) + + return &ValueLogRecord{uid: uid, key: key, value: value} } diff --git a/util/file_test.go b/util/file_test.go index 3b1aba0..d5e7b46 100644 --- a/util/file_test.go +++ b/util/file_test.go @@ -30,6 +30,6 @@ func TestDirSize(t *testing.T) { t.Run("test DirSize", func(t *testing.T) { size, errDirSize := DirSize(dirPath) require.NoError(t, errDirSize) - assert.Greater(t, size, int64(0)) + assert.Positive(t, size, int64(0)) }) } diff --git a/util/rand_kv.go b/util/rand_kv.go index ea1d97d..8363c55 100644 --- a/util/rand_kv.go +++ b/util/rand_kv.go @@ -14,7 +14,7 @@ var ( letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") ) -func GetTestKey(i int) []byte { +func GetTestKey(i int64) []byte { return []byte(fmt.Sprintf("lotusdb-test-key-%09d", i)) } diff --git a/util/rand_kv_test.go b/util/rand_kv_test.go index 1a69ced..42c58b3 100644 --- a/util/rand_kv_test.go +++ b/util/rand_kv_test.go @@ -8,7 +8,7 @@ import ( func TestGetTestKey(t *testing.T) { for i := 0; i < 10; i++ { - assert.NotNil(t, string(GetTestKey(i))) + assert.NotNil(t, string(GetTestKey(int64(i)))) } } diff --git a/vlog.go b/vlog.go index b47dc92..d045d15 100644 --- a/vlog.go +++ b/vlog.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/google/uuid" "github.com/rosedblabs/wal" "golang.org/x/sync/errgroup" ) @@ -16,8 +17,11 @@ const ( // valueLog value log is named after the concept in Wisckey paper // https://www.usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf type valueLog struct { - walFiles []*wal.WAL - options valueLogOptions + walFiles []*wal.WAL + dpTables []*deprecatedtable + deprecatedNumber uint32 + totalNumber uint32 + options valueLogOptions } type valueLogOptions struct { @@ -27,32 +31,33 @@ type valueLogOptions struct { // segmentSize specifies the maximum size of each segment file in bytes. segmentSize int64 - // blockCache specifies the size of the block cache in number of bytes. - // A block cache is used to store recently accessed data blocks, improving read performance. - // If BlockCache is set to 0, no block cache will be used. - blockCache uint32 - // value log are partitioned to several parts for concurrent writing and reading partitionNum uint32 // hash function for sharding hashKeyFunction func([]byte) uint64 - // writing validEntries to disk after reading the specified number of entries. - compactBatchCount int + // writing validEntries to disk after reading the specified memory capacity of entries. + compactBatchCapacity int + + // deprecated number + deprecatedtableNumber uint32 + + // total number + totalNumber uint32 } // open wal files for value log, it will open several wal files for concurrent writing and reading // the number of wal files is specified by the partitionNum. +// init deprecatedtable for every wal, we should build dpTable aftering compacting vlog. func openValueLog(options valueLogOptions) (*valueLog, error) { var walFiles []*wal.WAL - + var dpTables []*deprecatedtable for i := 0; i < int(options.partitionNum); i++ { vLogWal, err := wal.Open(wal.Options{ DirPath: options.dirPath, SegmentSize: options.segmentSize, SegmentFileExt: fmt.Sprintf(valueLogFileExt, i), - BlockCache: options.blockCache, Sync: false, // we will sync manually BytesPerSync: 0, // the same as Sync }) @@ -60,9 +65,17 @@ func openValueLog(options valueLogOptions) (*valueLog, error) { return nil, err } walFiles = append(walFiles, vLogWal) + // init dpTable + dpTable := newDeprecatedTable(i) + dpTables = append(dpTables, dpTable) } - return &valueLog{walFiles: walFiles, options: options}, nil + return &valueLog{ + walFiles: walFiles, + dpTables: dpTables, + deprecatedNumber: options.deprecatedtableNumber, + totalNumber: options.totalNumber, + options: options}, nil } // read the value log record from the specified position. @@ -80,6 +93,7 @@ func (vlog *valueLog) read(pos *KeyPosition) (*ValueLogRecord, error) { func (vlog *valueLog) writeBatch(records []*ValueLogRecord) ([]*KeyPosition, error) { // group the records by partition partitionRecords := make([][]*ValueLogRecord, vlog.options.partitionNum) + vlog.totalNumber += uint32(len(records)) for _, record := range records { p := vlog.getKeyPartition(record.key) partitionRecords[p] = append(partitionRecords[p], record) @@ -121,6 +135,7 @@ func (vlog *valueLog) writeBatch(records []*ValueLogRecord) ([]*KeyPosition, err keyPositions = append(keyPositions, &KeyPosition{ key: partitionRecords[part][writeIdx+i].key, partition: uint32(part), + uid: partitionRecords[part][writeIdx+i].uid, position: pos, }) } @@ -167,3 +182,21 @@ func (vlog *valueLog) close() error { func (vlog *valueLog) getKeyPartition(key []byte) int { return int(vlog.options.hashKeyFunction(key) % uint64(vlog.options.partitionNum)) } + +// we add middle layer of DeprecatedTable for interacting with autoCompact func. +func (vlog *valueLog) setDeprecated(partition uint32, id uuid.UUID) { + vlog.dpTables[partition].addEntry(id) + vlog.deprecatedNumber++ +} + +func (vlog *valueLog) isDeprecated(partition int, id uuid.UUID) bool { + return vlog.dpTables[partition].existEntry(id) +} + +func (vlog *valueLog) cleanDeprecatedTable() { + for i := 0; i < int(vlog.options.partitionNum); i++ { + vlog.dpTables[i].clean() + } + vlog.totalNumber -= vlog.deprecatedNumber + vlog.deprecatedNumber = 0 +} diff --git a/vlog_test.go b/vlog_test.go index 4e0ee1a..635848a 100644 --- a/vlog_test.go +++ b/vlog_test.go @@ -23,7 +23,6 @@ func TestOpenValueLog(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -41,7 +40,6 @@ func TestValueLogWriteAllKindsEntries(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -103,7 +101,6 @@ func TestValueLogWriteBatch(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -143,7 +140,6 @@ func TestValueLogWriteBatchReopen(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -170,7 +166,7 @@ func writeBatch(opts valueLogOptions, numRW int, numPart int) error { val := util.RandomValue(512) var logs []*ValueLogRecord for i := 0; i < numRW; i++ { - log := &ValueLogRecord{key: util.GetTestKey(i), value: val} + log := &ValueLogRecord{key: util.GetTestKey(int64(i)), value: val} logs = append(logs, log) } @@ -187,7 +183,6 @@ func TestValueLogRead(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -252,7 +247,6 @@ func TestValueLogReadReopen(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -299,7 +293,6 @@ func TestValueLogSync(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -328,7 +321,6 @@ func TestValueLogClose(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: GB, - blockCache: DefaultOptions.BlockCache, partitionNum: uint32(DefaultOptions.PartitionNum), hashKeyFunction: DefaultOptions.KeyHashFunction, } @@ -352,7 +344,6 @@ func TestValueLogMultiSegmentFiles(t *testing.T) { opts := valueLogOptions{ dirPath: path, segmentSize: 100 * MB, - blockCache: DefaultOptions.BlockCache, partitionNum: 1, hashKeyFunction: DefaultOptions.KeyHashFunction, }