Skip to content

Commit

Permalink
exp/lighthorizon: Fix the single-process index builder data race. (#4470
Browse files Browse the repository at this point in the history
)

* Add synchronization for the work submission routine. Thank you @sreuland!

Co-authored-by: shawn <[email protected]>
  • Loading branch information
Shaptic and sreuland authored Jul 21, 2022
1 parent 033de79 commit 8c9eec3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
11 changes: 9 additions & 2 deletions exp/lighthorizon/index/backend/parallel_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f

batches := make(chan *batch, parallel)

wg.Add(1)
go func() {
// forces this async func to be waited on also, otherwise the outer
// method returns before this finishes.
defer wg.Done()

for account, indexes := range allIndexes {
batches <- &batch{
account: account,
Expand All @@ -41,15 +46,17 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f
defer wg.Done()
for batch := range batches {
if err := f(batch); err != nil {
log.Error(err)
log.Errorf("Error occurred writing batch: %v, retrying...", err)
time.Sleep(50 * time.Millisecond)
batches <- batch
continue
}

nwritten := atomic.AddUint64(&written, 1)
if nwritten%1000 == 0 {
log.Infof("Writing indexes... %d/%d %.2f%%", nwritten, len(allIndexes), (float64(nwritten)/float64(len(allIndexes)))*100)
log.Infof("Writing indexes... %d/%d %.2f%%", nwritten,
len(allIndexes),
(float64(nwritten)/float64(len(allIndexes)))*100)
}

if nwritten == uint64(len(allIndexes)) {
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func BuildIndices(
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
if nprocessed%19 == 0 {
if nprocessed%97 == 0 {
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
}

Expand Down
3 changes: 1 addition & 2 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/url"
"path/filepath"
"runtime"

"github.com/aws/aws-sdk-go/aws"

Expand All @@ -17,7 +16,7 @@ func Connect(backendUrl string) (Store, error) {

func ConnectWithConfig(config StoreConfig) (Store, error) {
if config.Workers <= 0 {
config.Workers = uint32(runtime.NumCPU()) - 1
config.Workers = 1
}

parsed, err := url.Parse(config.Url)
Expand Down

0 comments on commit 8c9eec3

Please sign in to comment.