Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-57888: Index Update #2106

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int
return nil, err
}
config["internal"] = map[string][]byte{
string(mappingInternalKey): mappingBytes,
string(scorch.MappingInternalKey): mappingBytes,
}

// do not use real config, as these are options for the builder,
Expand Down
18 changes: 18 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,24 @@ func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error)
return openIndexUsing(path, runtimeConfig)
}

// Update index at the specified path, must exist.
// The mapping used when created will be overwritten by the mapping provided
// for all Index/Search operations.
// Throws an error without any changes to the index if an unupdatable mapping is provided
func Update(path string, newParams string) (Index, error) {
return updateIndexUsing(path, nil, newParams)
}

// UpdateUsing index at the specified path, must exist.
// The mapping used when created will be overwritten by the mapping provided
// for all Index/Search operations.
// The provided runtimeConfig can override settings
// persisted when the kvstore was created.
// Throws an error without any changes to the index if an unupdatable mapping is provided
func UpdateUsing(path string, runtimeConfig map[string]interface{}, newParams string) (Index, error) {
return updateIndexUsing(path, runtimeConfig, newParams)
}

// Builder is a limited interface, used to build indexes in an offline mode.
// Items cannot be updated or deleted, and the caller MUST ensure a document is
// indexed only once.
Expand Down
11 changes: 9 additions & 2 deletions index/scorch/optimize_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,15 @@ func (o *OptimizeVR) Finish() error {
wg.Done()
}()
for field, vrs := range o.vrs {
vecIndex, err := segment.InterpretVectorIndex(field,
o.requiresFiltering, origSeg.deleted)
var vecIndex segment_api.VectorIndex
var err error
if info, ok := o.snapshot.updatedFields[field]; ok && info.All || info.Index {
vecIndex, err = segment.InterpretVectorIndex("",
o.requiresFiltering, origSeg.deleted)
} else {
vecIndex, err = segment.InterpretVectorIndex(field,
o.requiresFiltering, origSeg.deleted)
}
if err != nil {
errorsM.Lock()
errors = append(errors, err)
Expand Down
37 changes: 32 additions & 5 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,18 @@
return nil, nil, err
}
}

// store updated field info
if segmentSnapshot.updatedFields != nil {
b, err := json.Marshal(segmentSnapshot.updatedFields)
if err != nil {
return nil, nil, err
}
err = snapshotSegmentBucket.Put(boltUpdatedFieldsKey, b)
if err != nil {
return nil, nil, err
}
}
}

return filenames, newSegmentPaths, nil
Expand Down Expand Up @@ -722,6 +734,7 @@
var boltMetaDataSegmentVersionKey = []byte("version")
var boltMetaDataTimeStamp = []byte("timeStamp")
var boltStatsKey = []byte("stats")
var boltUpdatedFieldsKey = []byte("fields")
var TotBytesWrittenKey = []byte("TotBytesWritten")

func (s *Scorch) loadFromBolt() error {
Expand Down Expand Up @@ -846,7 +859,7 @@
segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
return nil, fmt.Errorf("segment key, but bucket missing %x", k)
}
segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil {
Expand All @@ -860,6 +873,9 @@
}
rv.segment = append(rv.segment, segmentSnapshot)
rv.offsets = append(rv.offsets, running)
if segmentSnapshot.updatedFields != nil {
rv.updatedFields = segmentSnapshot.updatedFields
}
running += segmentSnapshot.segment.Count()
}
}
Expand All @@ -872,13 +888,13 @@
return nil, fmt.Errorf("segment path missing")
}
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
segment, err := s.segPlugin.Open(segmentPath)
seg, err := s.segPlugin.Open(segmentPath)
if err != nil {
return nil, fmt.Errorf("error opening bolt segment: %v", err)
}

rv := &SegmentSnapshot{
segment: segment,
segment: seg,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
}
Expand All @@ -888,7 +904,7 @@
r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r)
if err != nil {
_ = segment.Close()
_ = seg.Close()
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
}
if !deletedBitmap.IsEmpty() {
Expand All @@ -902,11 +918,22 @@
err := json.Unmarshal(statBytes, &statsMap)
stats := &fieldStats{statMap: statsMap}
if err != nil {
_ = segment.Close()
_ = seg.Close()
return nil, fmt.Errorf("error reading stat bytes: %v", err)
}
rv.stats = stats
}
updatedFieldBytes := segmentBucket.Get(boltUpdatedFieldsKey)
if updatedFieldBytes != nil {
var updatedFields map[string]index.FieldInfo

Check failure on line 928 in index/scorch/persister.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 928 in index/scorch/persister.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 928 in index/scorch/persister.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 928 in index/scorch/persister.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 928 in index/scorch/persister.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 928 in index/scorch/persister.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

err := json.Unmarshal(updatedFieldBytes, &updatedFields)
if err != nil {
_ = seg.Close()
return nil, fmt.Errorf("error reading updated field bytes: %v", err)
}
rv.updatedFields = updatedFields
}

return rv, nil
}
Expand Down
82 changes: 82 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sync"
Expand All @@ -36,6 +37,8 @@

var ErrClosed = fmt.Errorf("scorch closed")

var MappingInternalKey = []byte("_mapping")

type Scorch struct {
nextSegmentID uint64
stats Stats
Expand Down Expand Up @@ -882,3 +885,82 @@
func (s *Scorch) FireIndexEvent() {
s.fireEvent(EventKindIndexStart, 0)
}

// Updates bolt db with the given field info. Existing field info already in bolt
// will be merged before persisting. The index mapping is also overwritted both
// in bolt as well as the index snapshot
func (s *Scorch) UpdateFields(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error {

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
// Switch from pointer to value to marshal into a json for storage
updatedFields := make(map[string]index.FieldInfo)

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
for field, info := range fieldInfo {
updatedFields[field] = *info
}
err := s.updateBolt(updatedFields, mappingBytes)
if err != nil {
return err
}
s.root.m.Lock()
s.root.updatedFields = updatedFields
s.root.m.Unlock()
return nil
}

// Merge and update deleted field info and rewrite index mapping
func (s *Scorch) updateBolt(fieldInfo map[string]index.FieldInfo, mappingBytes []byte) error {

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
return s.rootBolt.Update(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
return nil
}

c := snapshots.Cursor()
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
_, _, err := decodeUvarintAscending(k)
if err != nil {
log.Printf("unable to parse segment epoch %x, continuing", k)
continue
}
snapshot := snapshots.Bucket(k)
cc := snapshot.Cursor()
for kk, _ := cc.First(); kk != nil; kk, _ = cc.Next() {
if kk[0] == boltInternalKey[0] {
internalBucket := snapshot.Bucket(kk)
if internalBucket == nil {
return fmt.Errorf("segment key, but bucket missing %x", kk)
}
err = internalBucket.Put(MappingInternalKey, mappingBytes)
if err != nil {
return err
}
} else if kk[0] != boltMetaDataKey[0] {
segmentBucket := snapshot.Bucket(kk)
if segmentBucket == nil {
return fmt.Errorf("segment key, but bucket missing %x", kk)
}
var updatedFields map[string]index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
updatedFieldBytes := segmentBucket.Get(boltUpdatedFieldsKey)
if updatedFieldBytes != nil {
err := json.Unmarshal(updatedFieldBytes, &updatedFields)
if err != nil {
return fmt.Errorf("error reading updated field bytes: %v", err)
}
for field, info := range fieldInfo {
updatedFields[field] = info
}
} else {
updatedFields = fieldInfo
}
b, err := json.Marshal(updatedFields)
if err != nil {
return err
}
err = segmentBucket.Put(boltUpdatedFieldsKey, b)
if err != nil {
return err
}
}
}
}
return nil
})
}
38 changes: 34 additions & 4 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@

m2 sync.Mutex // Protects the fields that follow.
fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's

updatedFields map[string]index.FieldInfo

Check failure on line 85 in index/scorch/snapshot_index.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 85 in index/scorch/snapshot_index.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 85 in index/scorch/snapshot_index.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 85 in index/scorch/snapshot_index.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 85 in index/scorch/snapshot_index.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 85 in index/scorch/snapshot_index.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
}

func (i *IndexSnapshot) Segments() []*SegmentSnapshot {
Expand Down Expand Up @@ -469,6 +471,12 @@
// Keeping that TODO for now until we have a cleaner way.
rvd.StoredFieldsSize += uint64(len(val))

// Skip fields that are supposed to have deleted store values
if info, ok := is.updatedFields[name]; ok &&
(info.All || info.Store) {
return true
}

// copy value, array positions to preserve them beyond the scope of this callback
value := append([]byte(nil), val...)
arrayPos := append([]uint64(nil), pos...)
Expand Down Expand Up @@ -608,10 +616,21 @@
segBytesRead := s.segment.BytesRead()
rv.incrementBytesRead(segBytesRead)
}
dict, err := s.segment.Dictionary(field)

var dict segment.TermDictionary
var err error

// Skip fields that are supposed to have no indexing
if info, ok := is.updatedFields[field]; ok &&
(info.Index || info.All) {
dict, err = s.segment.Dictionary("")
} else {
dict, err = s.segment.Dictionary(field)
}
if err != nil {
return nil, err
}

if dictStats, ok := dict.(segment.DiskStatsReporter); ok {
bytesRead := dictStats.BytesRead()
rv.incrementBytesRead(bytesRead)
Expand Down Expand Up @@ -756,14 +775,25 @@
}
}

// Filter out fields that are supposed to have no doc values
var filteredFields []string
for _, field := range vFields {
if info, ok := is.updatedFields[field]; ok &&
(info.DocValues || info.All) {
continue
} else {
filteredFields = append(filteredFields, field)
}
}

var errCh chan error

// cFields represents the fields that we'll need from the
// cachedDocs, and might be optionally be provided by the caller,
// if the caller happens to know we're on the same segmentIndex
// from a previous invocation
if cFields == nil {
cFields = subtractStrings(fields, vFields)
cFields = subtractStrings(fields, filteredFields)

if !ss.cachedDocs.hasFields(cFields) {
errCh = make(chan error, 1)
Expand All @@ -778,8 +808,8 @@
}
}

if ssvOk && ssv != nil && len(vFields) > 0 {
dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs)
if ssvOk && ssv != nil && len(filteredFields) > 0 {
dvs, err = ssv.VisitDocValues(localDocNum, filteredFields, visitor, dvs)
if err != nil {
return nil, nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@
// segment was mmaped recently, in which case
// we consider the loading cost of the metadata
// as part of IO stats.
mmaped uint32
id uint64
segment segment.Segment
deleted *roaring.Bitmap
creator string
stats *fieldStats
mmaped uint32
id uint64
segment segment.Segment
deleted *roaring.Bitmap
creator string
stats *fieldStats
updatedFields map[string]index.FieldInfo

Check failure on line 44 in index/scorch/snapshot_segment.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 44 in index/scorch/snapshot_segment.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 44 in index/scorch/snapshot_segment.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 44 in index/scorch/snapshot_segment.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 44 in index/scorch/snapshot_segment.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 44 in index/scorch/snapshot_segment.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

cachedMeta *cachedMeta

Expand Down
Loading
Loading