Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon, xdr: Rename CheckpointIndex to better reflect its capabilty. #4510

Merged
merged 3 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func readGzippedFrom(r io.Reader) (types.NamedIndices, int64, error) {
return nil, n, err
}

ind, err := types.NewCheckpointIndex(buf.Bytes())
ind, err := types.NewBitmapIndex(buf.Bytes())
if err != nil {
return nil, n, err
}
Expand Down
4 changes: 2 additions & 2 deletions exp/lighthorizon/index/backend/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
)

func TestGzipRoundtrip(t *testing.T) {
index := &types.CheckpointIndex{}
anotherIndex := &types.CheckpointIndex{}
index := &types.BitmapIndex{}
anotherIndex := &types.BitmapIndex{}
for i := 0; i < 100+rand.Intn(1000); i++ {
index.SetActive(uint32(rand.Intn(10_000)))
anotherIndex.SetActive(uint32(rand.Intn(10_000)))
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
if n == 0 {
return nil, os.ErrNotExist
}
var indexes map[string]*types.CheckpointIndex
var indexes map[string]*types.BitmapIndex
indexes, _, err = readGzippedFrom(bytes.NewReader(b.Bytes()))
if err != nil {
log.Errorf("Unable to parse %s: %v", account, err)
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (cfg *ReduceConfig) shouldProcessTx(txPrefix byte, routineIndex uint32) boo

// For every index that exists in `dest`, finds the corresponding index in
// `source` and merges it into `dest`'s version.
func mergeIndices(dest, source map[string]*types.CheckpointIndex) error {
func mergeIndices(dest, source map[string]*types.BitmapIndex) error {
for name, index := range dest {
// The source doesn't contain this particular index.
//
Expand Down
10 changes: 5 additions & 5 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ func (s *store) AddParticipantsToIndexesNoBackend(checkpoint uint32, index strin
var err error
for _, participant := range participants {
if _, ok := s.indexes[participant]; !ok {
s.indexes[participant] = map[string]*types.CheckpointIndex{}
s.indexes[participant] = map[string]*types.BitmapIndex{}
}

ind, ok := s.indexes[participant][index]
if !ok {
ind = &types.CheckpointIndex{}
ind = &types.BitmapIndex{}
s.indexes[participant][index] = ind
}

Expand Down Expand Up @@ -269,7 +269,7 @@ func (s *store) AddParticipantsToIndexes(checkpoint uint32, index string, partic
return nil
}

func (s *store) getCreateIndex(account, id string) (*types.CheckpointIndex, error) {
func (s *store) getCreateIndex(account, id string) (*types.BitmapIndex, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
defer s.approximateWorkingSet()
Expand All @@ -295,7 +295,7 @@ func (s *store) getCreateIndex(account, id string) (*types.CheckpointIndex, erro
ind, ok = accountIndexes[id]
if !ok {
// Not found anywhere, make a new one.
ind = &types.CheckpointIndex{}
ind = &types.BitmapIndex{}
accountIndexes[id] = ind
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *store) NextActive(account, indexId string, afterCheckpoint uint32) (uin
if err != nil {
return 0, err
}
return ind.NextActive(afterCheckpoint)
return ind.NextActiveBit(afterCheckpoint)
}

func (s *store) getCreateTrieIndex(prefix string) (*types.TrieIndex, error) {
Expand Down
164 changes: 82 additions & 82 deletions exp/lighthorizon/index/types/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,104 @@ import (
"github.com/stellar/go/xdr"
)

const CheckpointIndexVersion = 1
const BitmapIndexVersion = 1

type CheckpointIndex struct {
mutex sync.RWMutex
bitmap []byte
firstCheckpoint uint32
lastCheckpoint uint32
type BitmapIndex struct {
mutex sync.RWMutex
bitmap []byte
firstBit uint32
lastBit uint32
}

type NamedIndices map[string]*CheckpointIndex
type NamedIndices map[string]*BitmapIndex

func NewCheckpointIndex(b []byte) (*CheckpointIndex, error) {
xdrCheckpoint := xdr.CheckpointIndex{}
err := xdrCheckpoint.UnmarshalBinary(b)
func NewBitmapIndex(b []byte) (*BitmapIndex, error) {
xdrBitmap := xdr.BitmapIndex{}
err := xdrBitmap.UnmarshalBinary(b)
if err != nil {
return nil, err
}

return NewCheckpointIndexFromXDR(xdrCheckpoint), nil
return NewBitmapIndexFromXDR(xdrBitmap), nil
}

func NewCheckpointIndexFromXDR(index xdr.CheckpointIndex) *CheckpointIndex {
return &CheckpointIndex{
bitmap: index.Bitmap[:],
firstCheckpoint: uint32(index.FirstCheckpoint),
lastCheckpoint: uint32(index.LastCheckpoint),
func NewBitmapIndexFromXDR(index xdr.BitmapIndex) *BitmapIndex {
return &BitmapIndex{
bitmap: index.Bitmap[:],
firstBit: uint32(index.FirstBit),
lastBit: uint32(index.LastBit),
}
}

func (i *CheckpointIndex) Size() int {
func (i *BitmapIndex) Size() int {
return len(i.bitmap)
}

func (i *CheckpointIndex) SetActive(checkpoint uint32) error {
func (i *BitmapIndex) SetActive(index uint32) error {
i.mutex.Lock()
defer i.mutex.Unlock()
return i.setActive(checkpoint)
return i.setActive(index)
}

func bitShiftLeft(checkpoint uint32) byte {
if checkpoint%8 == 0 {
func bitShiftLeft(index uint32) byte {
if index%8 == 0 {
return 1
} else {
return byte(1) << (8 - checkpoint%8)
return byte(1) << (8 - index%8)
}
}

func (i *CheckpointIndex) rangeFirstCheckpoint() uint32 {
return (i.firstCheckpoint-1)/8*8 + 1
func (i *BitmapIndex) rangeFirstBit() uint32 {
return (i.firstBit-1)/8*8 + 1
}

func (i *CheckpointIndex) rangeLastCheckpoint() uint32 {
return i.rangeFirstCheckpoint() + uint32(len(i.bitmap))*8 - 1
func (i *BitmapIndex) rangeLastBit() uint32 {
return i.rangeFirstBit() + uint32(len(i.bitmap))*8 - 1
}

func (i *CheckpointIndex) setActive(checkpoint uint32) error {
if i.firstCheckpoint == 0 {
i.firstCheckpoint = checkpoint
i.lastCheckpoint = checkpoint
b := bitShiftLeft(checkpoint)
func (i *BitmapIndex) setActive(index uint32) error {
if i.firstBit == 0 {
i.firstBit = index
i.lastBit = index
b := bitShiftLeft(index)
i.bitmap = []byte{b}
} else {
if checkpoint >= i.rangeFirstCheckpoint() && checkpoint <= i.rangeLastCheckpoint() {
if index >= i.rangeFirstBit() && index <= i.rangeLastBit() {
// Update the bit in existing range
b := bitShiftLeft(checkpoint)
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
b := bitShiftLeft(index)
loc := (index - i.rangeFirstBit()) / 8
i.bitmap[loc] = i.bitmap[loc] | b

if checkpoint < i.firstCheckpoint {
i.firstCheckpoint = checkpoint
if index < i.firstBit {
i.firstBit = index
}
if checkpoint > i.lastCheckpoint {
i.lastCheckpoint = checkpoint
if index > i.lastBit {
i.lastBit = index
}
} else {
// Expand the bitmap
if checkpoint < i.rangeFirstCheckpoint() {
if index < i.rangeFirstBit() {
// ...to the left
c := (i.rangeFirstCheckpoint() - checkpoint) / 8
if (i.rangeFirstCheckpoint()-checkpoint)%8 != 0 {
c := (i.rangeFirstBit() - index) / 8
if (i.rangeFirstBit()-index)%8 != 0 {
c++
}
newBytes := make([]byte, c)
i.bitmap = append(newBytes, i.bitmap...)

b := bitShiftLeft(checkpoint)
b := bitShiftLeft(index)
i.bitmap[0] = i.bitmap[0] | b

i.firstCheckpoint = checkpoint
} else if checkpoint > i.rangeLastCheckpoint() {
i.firstBit = index
} else if index > i.rangeLastBit() {
// ... to the right
newBytes := make([]byte, (checkpoint-i.rangeLastCheckpoint())/8+1)
newBytes := make([]byte, (index-i.rangeLastBit())/8+1)
i.bitmap = append(i.bitmap, newBytes...)
b := bitShiftLeft(checkpoint)
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
b := bitShiftLeft(index)
loc := (index - i.rangeFirstBit()) / 8
i.bitmap[loc] = i.bitmap[loc] | b

i.lastCheckpoint = checkpoint
i.lastBit = index
}
}
}
Expand All @@ -114,30 +114,30 @@ func (i *CheckpointIndex) setActive(checkpoint uint32) error {
}

//lint:ignore U1000 Ignore unused function temporarily
func (i *CheckpointIndex) isActive(checkpoint uint32) bool {
if checkpoint >= i.firstCheckpoint && checkpoint <= i.lastCheckpoint {
b := bitShiftLeft(checkpoint)
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
func (i *BitmapIndex) isActive(index uint32) bool {
if index >= i.firstBit && index <= i.lastBit {
b := bitShiftLeft(index)
loc := (index - i.rangeFirstBit()) / 8
return i.bitmap[loc]&b != 0
} else {
return false
}
}

func (i *CheckpointIndex) iterate(f func(checkpoint uint32)) error {
func (i *BitmapIndex) iterate(f func(index uint32)) error {
i.mutex.RLock()
defer i.mutex.RUnlock()

if i.firstCheckpoint == 0 {
if i.firstBit == 0 {
return nil
}

f(i.firstCheckpoint)
curr := i.firstCheckpoint
f(i.firstBit)
curr := i.firstBit

for {
var err error
curr, err = i.nextActive(curr + 1)
curr, err = i.nextActiveBit(curr + 1)
if err != nil {
if err == io.EOF {
break
Expand All @@ -151,55 +151,55 @@ func (i *CheckpointIndex) iterate(f func(checkpoint uint32)) error {
return nil
}

func (i *CheckpointIndex) Merge(other *CheckpointIndex) error {
func (i *BitmapIndex) Merge(other *BitmapIndex) error {
i.mutex.Lock()
defer i.mutex.Unlock()

var err error
other.iterate(func(checkpoint uint32) {
other.iterate(func(index uint32) {
if err != nil {
return
}
err = i.setActive(checkpoint)
err = i.setActive(index)
})

return err
}

// NextActive returns the next checkpoint (inclusive) where this index is
// active. "Inclusive" means that if the index is active at `checkpoint`, this
// returns `checkpoint`.
func (i *CheckpointIndex) NextActive(checkpoint uint32) (uint32, error) {
// NextActiveBit returns the next bit position (inclusive) where this index is
// active. "Inclusive" means that if it's already active at `position`, this
// returns `position`.
func (i *BitmapIndex) NextActiveBit(position uint32) (uint32, error) {
i.mutex.RLock()
defer i.mutex.RUnlock()
return i.nextActive(checkpoint)
return i.nextActiveBit(position)
}

func (i *CheckpointIndex) nextActive(checkpoint uint32) (uint32, error) {
if i.firstCheckpoint == 0 || checkpoint > i.lastCheckpoint {
func (i *BitmapIndex) nextActiveBit(position uint32) (uint32, error) {
if i.firstBit == 0 || position > i.lastBit {
// We're past the end.
// TODO: Should this be an error? or how should we signal NONE here?
return 0, io.EOF
}

if checkpoint < i.firstCheckpoint {
checkpoint = i.firstCheckpoint
if position < i.firstBit {
position = i.firstBit
}

// Must be within the range, find the first non-zero after our start
loc := (checkpoint - i.rangeFirstCheckpoint()) / 8
loc := (position - i.rangeFirstBit()) / 8

// Is it in the same byte?
if shift, ok := maxBitAfter(i.bitmap[loc], (checkpoint-1)%8); ok {
return i.rangeFirstCheckpoint() + (loc * 8) + shift, nil
if shift, ok := maxBitAfter(i.bitmap[loc], (position-1)%8); ok {
return i.rangeFirstBit() + (loc * 8) + shift, nil
}

// Scan bytes after
loc++
for ; loc < uint32(len(i.bitmap)); loc++ {
// Find the offset of the set bit
if shift, ok := maxBitAfter(i.bitmap[loc], 0); ok {
return i.rangeFirstCheckpoint() + (loc * 8) + shift, nil
return i.rangeFirstBit() + (loc * 8) + shift, nil
}
}

Expand All @@ -223,30 +223,30 @@ func maxBitAfter(b byte, after uint32) (uint32, bool) {
return 0, false
}

func (i *CheckpointIndex) ToXDR() xdr.CheckpointIndex {
func (i *BitmapIndex) ToXDR() xdr.BitmapIndex {
i.mutex.RLock()
defer i.mutex.RUnlock()

return xdr.CheckpointIndex{
FirstCheckpoint: xdr.Uint32(i.firstCheckpoint),
LastCheckpoint: xdr.Uint32(i.lastCheckpoint),
Bitmap: i.bitmap,
return xdr.BitmapIndex{
FirstBit: xdr.Uint32(i.firstBit),
LastBit: xdr.Uint32(i.lastBit),
Bitmap: i.bitmap,
}
}

func (i *CheckpointIndex) Buffer() *bytes.Buffer {
func (i *BitmapIndex) Buffer() *bytes.Buffer {
i.mutex.RLock()
defer i.mutex.RUnlock()

xdrCheckpoint := i.ToXDR()
b, err := xdrCheckpoint.MarshalBinary()
xdrBitmap := i.ToXDR()
b, err := xdrBitmap.MarshalBinary()
if err != nil {
panic(err)
}
return bytes.NewBuffer(b)
}

// Flush flushes the index data to byte slice in index format.
func (i *CheckpointIndex) Flush() []byte {
func (i *BitmapIndex) Flush() []byte {
return i.Buffer().Bytes()
}
Loading