From 7228d90ee4f040fe38eee1cc0879a2a03b44a797 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 5 Mar 2020 02:35:13 -0500 Subject: [PATCH] [dbnode] Concurrent time series indexing within a single batch (#2146) * Concurrent indexing. --- src/dbnode/storage/index.go | 1 + src/dbnode/storage/index/block.go | 26 ++- src/m3ninx/index/batch.go | 13 ++ src/m3ninx/index/segment/builder/builder.go | 189 ++++++++++++------ .../index/segment/builder/builder_test.go | 6 + .../index/segment/builder/bytes_slice_iter.go | 69 ++++++- src/m3ninx/index/segment/builder/options.go | 24 +++ .../segment/builder/sharded_fields_map.go | 65 ++++++ src/m3ninx/index/segment/segment_mock.go | 139 ++++++++++++- src/m3ninx/index/segment/types.go | 9 +- 10 files changed, 462 insertions(+), 79 deletions(-) create mode 100644 src/m3ninx/index/segment/builder/sharded_fields_map.go diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index e2a0d3aafa..97bb398a11 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -751,6 +751,7 @@ func (i *nsIndex) Flush( if err != nil { return err } + defer builder.Close() var evicted int for _, block := range flushable { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 324fc8ab37..b0e90f13d5 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -706,18 +706,24 @@ func (b *block) cleanupForegroundCompactWithLock() { b.foregroundSegments = nil // Free compactor resources. - if b.compact.foregroundCompactor == nil { - return + if b.compact.foregroundCompactor != nil { + if err := b.compact.foregroundCompactor.Close(); err != nil { + instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { + l.Error("error closing index block foreground compactor", zap.Error(err)) + }) + } + b.compact.foregroundCompactor = nil } - if err := b.compact.foregroundCompactor.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { - l.Error("error closing index block foreground compactor", zap.Error(err)) - }) + // Free segment builder resources. + if b.compact.segmentBuilder != nil { + if err := b.compact.segmentBuilder.Close(); err != nil { + instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { + l.Error("error closing index block segment builder", zap.Error(err)) + }) + } + b.compact.segmentBuilder = nil } - - b.compact.foregroundCompactor = nil - b.compact.segmentBuilder = nil } func (b *block) executorWithRLock() (search.Executor, error) { @@ -1484,7 +1490,7 @@ func (b *block) writeBatchErrorInvalidState(state blockState) error { // blockCompact has several lazily allocated compaction components. type blockCompact struct { - segmentBuilder segment.DocumentsBuilder + segmentBuilder segment.CloseableDocumentsBuilder foregroundCompactor *compaction.Compactor backgroundCompactor *compaction.Compactor compactingForeground bool diff --git a/src/m3ninx/index/batch.go b/src/m3ninx/index/batch.go index 82803a6349..a22145610c 100644 --- a/src/m3ninx/index/batch.go +++ b/src/m3ninx/index/batch.go @@ -24,6 +24,7 @@ import ( "bytes" "errors" "fmt" + "sync" "github.com/m3db/m3/src/m3ninx/doc" ) @@ -79,6 +80,8 @@ func NewBatch(docs []doc.Document, opts ...BatchOption) Batch { // BatchPartialError indicates an error was encountered inserting some documents in a batch. // It is not safe for concurrent use. type BatchPartialError struct { + sync.Mutex + errs []BatchError } @@ -138,6 +141,16 @@ func (e *BatchPartialError) Add(err BatchError) { e.errs = append(e.errs, err) } +// AddWithLock adds an error to e with a lock. Any nil errors are ignored. +func (e *BatchPartialError) AddWithLock(err BatchError) { + if err.Err == nil { + return + } + e.Lock() + e.errs = append(e.errs, err) + e.Unlock() +} + // Errs returns the errors with the indexes of the documents in the batch // which were not indexed. func (e *BatchPartialError) Errs() []BatchError { diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 8367c5f6cf..a7895d8808 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -23,18 +23,43 @@ package builder import ( "errors" "fmt" + "sync" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/util" + + "github.com/cespare/xxhash" ) var ( errDocNotFound = errors.New("doc not found") + errClosed = errors.New("builder closed") +) + +const ( + // Slightly buffer the work to avoid blocking main thread. + indexQueueSize = 2 << 9 // 1024 ) +type indexJob struct { + wg *sync.WaitGroup + + id postings.ID + field doc.Field + + shard int + idx int + batchErr *index.BatchPartialError +} + +type builderStatus struct { + sync.RWMutex + closed bool +} + type builder struct { opts Options newUUIDFn util.NewUUIDFn @@ -44,29 +69,47 @@ type builder struct { batchSizeOne index.Batch docs []doc.Document idSet *IDsMap - fields *fieldsMap - uniqueFields [][]byte + fields *shardedFieldsMap + uniqueFields [][][]byte + + indexQueues []chan indexJob + status builderStatus } // NewBuilderFromDocuments returns a builder from documents, it is // not thread safe and is optimized for insertion speed and a // final build step when documents are indexed. -func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) { - return &builder{ +func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) { + concurrency := opts.Concurrency() + b := &builder{ opts: opts, newUUIDFn: opts.NewUUIDFn(), batchSizeOne: index.Batch{ - Docs: make([]doc.Document, 1), - AllowPartialUpdates: false, + Docs: make([]doc.Document, 1), }, idSet: NewIDsMap(IDsMapOptions{ InitialSize: opts.InitialCapacity(), }), - fields: newFieldsMap(fieldsMapOptions{ - InitialSize: opts.InitialCapacity(), - }), - uniqueFields: make([][]byte, 0, opts.InitialCapacity()), - }, nil + uniqueFields: make([][][]byte, 0, concurrency), + indexQueues: make([]chan indexJob, 0, concurrency), + } + + for i := 0; i < concurrency; i++ { + indexQueue := make(chan indexJob, indexQueueSize) + b.indexQueues = append(b.indexQueues, indexQueue) + go b.indexWorker(indexQueue) + + // Give each shard a fraction of the configured initial capacity. + shardInitialCapacity := opts.InitialCapacity() + if shardInitialCapacity > 0 { + shardInitialCapacity /= concurrency + } + shardUniqueFields := make([][]byte, 0, shardInitialCapacity) + b.uniqueFields = append(b.uniqueFields, shardUniqueFields) + b.fields = newShardedFieldsMap(concurrency, shardInitialCapacity) + } + + return b, nil } func (b *builder) Reset(offset postings.ID) { @@ -83,15 +126,15 @@ func (b *builder) Reset(offset postings.ID) { b.idSet.Reset() // Keep fields around, just reset the terms set for each one. - for _, entry := range b.fields.Iter() { - entry.Value().reset() - } + b.fields.ResetTermsSets() // Reset the unique fields slice - for i := range b.uniqueFields { - b.uniqueFields[i] = nil + for i, shardUniqueFields := range b.uniqueFields { + for i := range shardUniqueFields { + shardUniqueFields[i] = nil + } + b.uniqueFields[i] = shardUniqueFields[:0] } - b.uniqueFields = b.uniqueFields[:0] } func (b *builder) Insert(d doc.Document) ([]byte, error) { @@ -107,15 +150,20 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) { } func (b *builder) InsertBatch(batch index.Batch) error { + b.status.RLock() + defer b.status.RUnlock() + + if b.status.closed { + return errClosed + } + // NB(r): This is all kept in a single method to make the // insertion path fast. + var wg sync.WaitGroup batchErr := index.NewBatchPartialError() for i, d := range batch.Docs { // Validate doc if err := d.Validate(); err != nil { - if !batch.AllowPartialUpdates { - return err - } batchErr.Add(index.BatchError{Err: err, Idx: i}) continue } @@ -124,9 +172,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { if !d.HasID() { id, err := b.newUUIDFn() if err != nil { - if !batch.AllowPartialUpdates { - return err - } batchErr.Add(index.BatchError{Err: err, Idx: i}) continue } @@ -139,9 +184,6 @@ func (b *builder) InsertBatch(batch index.Batch) error { // Avoid duplicates. if _, ok := b.idSet.Get(d.ID); ok { - if !batch.AllowPartialUpdates { - return index.ErrDuplicateID - } batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i}) continue } @@ -158,50 +200,73 @@ func (b *builder) InsertBatch(batch index.Batch) error { // Index the terms. for _, f := range d.Fields { - if err := b.index(postings.ID(postingsListID), f); err != nil { - if !batch.AllowPartialUpdates { - return err - } - batchErr.Add(index.BatchError{Err: err, Idx: i}) - } + b.index(&wg, postings.ID(postingsListID), f, i, batchErr) } - if err := b.index(postings.ID(postingsListID), doc.Field{ + b.index(&wg, postings.ID(postingsListID), doc.Field{ Name: doc.IDReservedFieldName, Value: d.ID, - }); err != nil { - if !batch.AllowPartialUpdates { - return err - } - batchErr.Add(index.BatchError{Err: err, Idx: i}) - } + }, i, batchErr) } + // Wait for all the concurrent indexing jobs to finish. + wg.Wait() + if !batchErr.IsEmpty() { return batchErr } return nil } -func (b *builder) index(id postings.ID, f doc.Field) error { - terms, ok := b.fields.Get(f.Name) - if !ok { - terms = newTerms(b.opts) - b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: true, - }) +func (b *builder) index( + wg *sync.WaitGroup, + id postings.ID, + f doc.Field, + i int, + batchErr *index.BatchPartialError, +) { + wg.Add(1) + // NB(bodu): To avoid locking inside of the terms, we shard the work + // by field name. + shard := b.calculateShard(f.Name) + b.indexQueues[shard] <- indexJob{ + wg: wg, + id: id, + field: f, + shard: shard, + idx: i, + batchErr: batchErr, } +} - // If empty field, track insertion of this key into the fields - // collection for correct response when retrieving all fields. - newField := terms.size() == 0 - if err := terms.post(f.Value, id); err != nil { - return err - } - if newField { - b.uniqueFields = append(b.uniqueFields, f.Name) +func (b *builder) indexWorker(indexQueue chan indexJob) { + for job := range indexQueue { + terms, ok := b.fields.ShardedGet(job.shard, job.field.Name) + if !ok { + // NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes. + terms = newTerms(b.opts) + b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: true, + }) + } + + // If empty field, track insertion of this key into the fields + // collection for correct response when retrieving all fields. + newField := terms.size() == 0 + // NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post(). + err := terms.post(job.field.Value, job.id) + if err != nil { + job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx}) + } + if err == nil && newField { + b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name) + } + job.wg.Done() } - return nil +} + +func (b *builder) calculateShard(field []byte) int { + return int(xxhash.Sum64(field) % uint64(len(b.indexQueues))) } func (b *builder) AllDocs() (index.IDDocIterator, error) { @@ -236,7 +301,7 @@ func (b *builder) Fields() (segment.FieldsIterator, error) { } func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { - terms, ok := b.fields.Get(field) + terms, ok := b.fields.ShardedGet(b.calculateShard(field), field) if !ok { return nil, fmt.Errorf("field not found: %s", string(field)) } @@ -247,3 +312,13 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { return newTermsIter(terms.uniqueTerms), nil } + +func (b *builder) Close() error { + b.status.Lock() + defer b.status.Unlock() + for _, q := range b.indexQueues { + close(q) + } + b.status.closed = true + return nil +} diff --git a/src/m3ninx/index/segment/builder/builder_test.go b/src/m3ninx/index/segment/builder/builder_test.go index 2449db5e72..cba14e86d7 100644 --- a/src/m3ninx/index/segment/builder/builder_test.go +++ b/src/m3ninx/index/segment/builder/builder_test.go @@ -79,6 +79,9 @@ var ( func TestBuilderFields(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) require.NoError(t, err) + defer func() { + require.NoError(t, builder.Close()) + }() for i := 0; i < 10; i++ { builder.Reset(0) @@ -106,6 +109,9 @@ func TestBuilderFields(t *testing.T) { func TestBuilderTerms(t *testing.T) { builder, err := NewBuilderFromDocuments(testOptions) require.NoError(t, err) + defer func() { + require.NoError(t, builder.Close()) + }() for i := 0; i < 10; i++ { builder.Reset(0) diff --git a/src/m3ninx/index/segment/builder/bytes_slice_iter.go b/src/m3ninx/index/segment/builder/bytes_slice_iter.go index 336f1e0e1a..d516cfc285 100644 --- a/src/m3ninx/index/segment/builder/bytes_slice_iter.go +++ b/src/m3ninx/index/segment/builder/bytes_slice_iter.go @@ -25,6 +25,8 @@ import ( "sort" "github.com/m3db/m3/src/m3ninx/index/segment" + + "github.com/twotwotwo/sorts" ) // OrderedBytesSliceIter is a new ordered bytes slice iterator. @@ -32,9 +34,9 @@ type OrderedBytesSliceIter struct { err error done bool - currentIdx int - current []byte - backingSlice [][]byte + currentIdx int + current []byte + backingSlices *sortableSliceOfSliceOfByteSlicesAsc } var _ segment.FieldsIterator = &OrderedBytesSliceIter{} @@ -42,12 +44,13 @@ var _ segment.FieldsIterator = &OrderedBytesSliceIter{} // NewOrderedBytesSliceIter sorts a slice of bytes and then // returns an iterator over them. func NewOrderedBytesSliceIter( - maybeUnorderedSlice [][]byte, + maybeUnorderedSlices [][][]byte, ) *OrderedBytesSliceIter { - sortSliceOfByteSlices(maybeUnorderedSlice) + sortable := &sortableSliceOfSliceOfByteSlicesAsc{data: maybeUnorderedSlices} + sorts.ByBytes(sortable) return &OrderedBytesSliceIter{ - currentIdx: -1, - backingSlice: maybeUnorderedSlice, + currentIdx: -1, + backingSlices: sortable, } } @@ -57,11 +60,12 @@ func (b *OrderedBytesSliceIter) Next() bool { return false } b.currentIdx++ - if b.currentIdx >= len(b.backingSlice) { + if b.currentIdx >= b.backingSlices.Len() { b.done = true return false } - b.current = b.backingSlice[b.currentIdx] + iOuter, iInner := b.backingSlices.getIndices(b.currentIdx) + b.current = b.backingSlices.data[iOuter][iInner] return true } @@ -77,7 +81,7 @@ func (b *OrderedBytesSliceIter) Err() error { // Len returns the length of the slice. func (b *OrderedBytesSliceIter) Len() int { - return len(b.backingSlice) + return b.backingSlices.Len() } // Close releases resources. @@ -86,6 +90,51 @@ func (b *OrderedBytesSliceIter) Close() error { return nil } +type sortableSliceOfSliceOfByteSlicesAsc struct { + data [][][]byte + length int +} + +func (s *sortableSliceOfSliceOfByteSlicesAsc) Len() int { + if s.length > 0 { + return s.length + } + + totalLen := 0 + for _, innerSlice := range s.data { + totalLen += len(innerSlice) + } + s.length = totalLen + + return s.length +} + +func (s *sortableSliceOfSliceOfByteSlicesAsc) Less(i, j int) bool { + iOuter, iInner := s.getIndices(i) + jOuter, jInner := s.getIndices(j) + return bytes.Compare(s.data[iOuter][iInner], s.data[jOuter][jInner]) < 0 +} + +func (s *sortableSliceOfSliceOfByteSlicesAsc) Swap(i, j int) { + iOuter, iInner := s.getIndices(i) + jOuter, jInner := s.getIndices(j) + s.data[iOuter][iInner], s.data[jOuter][jInner] = s.data[jOuter][jInner], s.data[iOuter][iInner] +} + +func (s *sortableSliceOfSliceOfByteSlicesAsc) Key(i int) []byte { + iOuter, iInner := s.getIndices(i) + return s.data[iOuter][iInner] +} + +func (s *sortableSliceOfSliceOfByteSlicesAsc) getIndices(idx int) (int, int) { + currentSliceIdx := 0 + for idx >= len(s.data[currentSliceIdx]) { + idx -= len(s.data[currentSliceIdx]) + currentSliceIdx++ + } + return currentSliceIdx, idx +} + func sortSliceOfByteSlices(b [][]byte) { sort.Slice(b, func(i, j int) bool { return bytes.Compare(b[i], b[j]) < 0 diff --git a/src/m3ninx/index/segment/builder/options.go b/src/m3ninx/index/segment/builder/options.go index d96800c209..e2b69b0719 100644 --- a/src/m3ninx/index/segment/builder/options.go +++ b/src/m3ninx/index/segment/builder/options.go @@ -21,6 +21,8 @@ package builder import ( + "runtime" + "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/postings/roaring" "github.com/m3db/m3/src/m3ninx/util" @@ -30,6 +32,10 @@ const ( defaultInitialCapacity = 128 ) +var ( + defaultConcurrency = runtime.NumCPU() +) + // Options is a collection of options for segment building. type Options interface { // SetNewUUIDFn sets the function used to generate new UUIDs. @@ -49,12 +55,19 @@ type Options interface { // PostingsListPool returns the postings list pool. PostingsListPool() postings.Pool + + // SetConcurrency sets the indexing concurrency. + SetConcurrency(value int) Options + + // Concurrency returns the indexing concurrency. + Concurrency() int } type opts struct { newUUIDFn util.NewUUIDFn initialCapacity int postingsPool postings.Pool + concurrency int } // NewOptions returns new options. @@ -63,6 +76,7 @@ func NewOptions() Options { newUUIDFn: util.NewUUID, initialCapacity: defaultInitialCapacity, postingsPool: postings.NewPool(nil, roaring.NewPostingsList), + concurrency: defaultConcurrency, } } @@ -95,3 +109,13 @@ func (o *opts) SetPostingsListPool(v postings.Pool) Options { func (o *opts) PostingsListPool() postings.Pool { return o.postingsPool } + +func (o *opts) SetConcurrency(v int) Options { + opts := *o + opts.concurrency = v + return &opts +} + +func (o *opts) Concurrency() int { + return o.concurrency +} diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go new file mode 100644 index 0000000000..ffa452c94c --- /dev/null +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -0,0 +1,65 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package builder + +type shardedFieldsMap struct { + data []*fieldsMap +} + +func newShardedFieldsMap( + numShards int, + shardInitialCapacity int, +) *shardedFieldsMap { + data := make([]*fieldsMap, 0, numShards) + for i := 0; i < numShards; i++ { + data = append(data, newFieldsMap(fieldsMapOptions{ + InitialSize: shardInitialCapacity, + })) + } + return &shardedFieldsMap{ + data: data, + } +} + +func (s *shardedFieldsMap) ShardedGet( + shard int, + k []byte, +) (*terms, bool) { + return s.data[shard].Get(k) +} + +func (s *shardedFieldsMap) ShardedSetUnsafe( + shard int, + k []byte, + v *terms, + opts fieldsMapSetUnsafeOptions, +) { + s.data[shard].SetUnsafe(k, v, opts) +} + +// ResetTermsSets keeps fields around but resets the terms set for each one. +func (s *shardedFieldsMap) ResetTermsSets() { + for _, fieldMap := range s.data { + for _, entry := range fieldMap.Iter() { + entry.Value().reset() + } + } +} diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 3325e03523..4102918076 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/m3ninx/index/segment/types.go -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -1093,6 +1093,143 @@ func (mr *MockDocumentsBuilderMockRecorder) InsertBatch(b interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertBatch", reflect.TypeOf((*MockDocumentsBuilder)(nil).InsertBatch), b) } +// MockCloseableDocumentsBuilder is a mock of CloseableDocumentsBuilder interface +type MockCloseableDocumentsBuilder struct { + ctrl *gomock.Controller + recorder *MockCloseableDocumentsBuilderMockRecorder +} + +// MockCloseableDocumentsBuilderMockRecorder is the mock recorder for MockCloseableDocumentsBuilder +type MockCloseableDocumentsBuilderMockRecorder struct { + mock *MockCloseableDocumentsBuilder +} + +// NewMockCloseableDocumentsBuilder creates a new mock instance +func NewMockCloseableDocumentsBuilder(ctrl *gomock.Controller) *MockCloseableDocumentsBuilder { + mock := &MockCloseableDocumentsBuilder{ctrl: ctrl} + mock.recorder = &MockCloseableDocumentsBuilderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCloseableDocumentsBuilder) EXPECT() *MockCloseableDocumentsBuilderMockRecorder { + return m.recorder +} + +// Fields mocks base method +func (m *MockCloseableDocumentsBuilder) Fields() (FieldsIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Fields") + ret0, _ := ret[0].(FieldsIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Fields indicates an expected call of Fields +func (mr *MockCloseableDocumentsBuilderMockRecorder) Fields() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fields", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Fields)) +} + +// Terms mocks base method +func (m *MockCloseableDocumentsBuilder) Terms(field []byte) (TermsIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Terms", field) + ret0, _ := ret[0].(TermsIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Terms indicates an expected call of Terms +func (mr *MockCloseableDocumentsBuilderMockRecorder) Terms(field interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Terms", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Terms), field) +} + +// Reset mocks base method +func (m *MockCloseableDocumentsBuilder) Reset(offset postings.ID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reset", offset) +} + +// Reset indicates an expected call of Reset +func (mr *MockCloseableDocumentsBuilderMockRecorder) Reset(offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Reset), offset) +} + +// Docs mocks base method +func (m *MockCloseableDocumentsBuilder) Docs() []doc.Document { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Docs") + ret0, _ := ret[0].([]doc.Document) + return ret0 +} + +// Docs indicates an expected call of Docs +func (mr *MockCloseableDocumentsBuilderMockRecorder) Docs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Docs", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Docs)) +} + +// AllDocs mocks base method +func (m *MockCloseableDocumentsBuilder) AllDocs() (index.IDDocIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllDocs") + ret0, _ := ret[0].(index.IDDocIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllDocs indicates an expected call of AllDocs +func (mr *MockCloseableDocumentsBuilderMockRecorder) AllDocs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllDocs", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).AllDocs)) +} + +// Insert mocks base method +func (m *MockCloseableDocumentsBuilder) Insert(d doc.Document) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Insert", d) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Insert indicates an expected call of Insert +func (mr *MockCloseableDocumentsBuilderMockRecorder) Insert(d interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Insert), d) +} + +// InsertBatch mocks base method +func (m *MockCloseableDocumentsBuilder) InsertBatch(b index.Batch) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertBatch", b) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertBatch indicates an expected call of InsertBatch +func (mr *MockCloseableDocumentsBuilderMockRecorder) InsertBatch(b interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertBatch", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).InsertBatch), b) +} + +// Close mocks base method +func (m *MockCloseableDocumentsBuilder) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockCloseableDocumentsBuilderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Close)) +} + // MockSegmentsBuilder is a mock of SegmentsBuilder interface type MockSegmentsBuilder struct { ctrl *gomock.Controller diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index c80bc3a8c1..48bcf5b3c8 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -167,12 +167,19 @@ type Builder interface { AllDocs() (index.IDDocIterator, error) } -// DocumentsBuilder is a builder is written documents to. +// DocumentsBuilder is a builder that has documents written to it. type DocumentsBuilder interface { Builder index.Writer } +// CloseableDocumentsBuilder is a builder that has documents written to it and has freeable resources. +type CloseableDocumentsBuilder interface { + DocumentsBuilder + + Close() error +} + // SegmentsBuilder is a builder that is built from segments. type SegmentsBuilder interface { Builder