Skip to content

Commit

Permalink
1207 unit test deadlock in smt create (#1213)
Browse files Browse the repository at this point in the history
* opt threads in smt_bulk_create

* values -> pointers (reduce mem consumption)
  • Loading branch information
kstoykov authored Sep 24, 2024
1 parent 1ee1ecd commit f413fd1
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 104 deletions.
214 changes: 121 additions & 93 deletions smt/pkg/smt/smt_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
defer s.clearUpMutex.Unlock()

log.Info(fmt.Sprintf("[%s] Building temp binary tree started", logPrefix))

totalKeysCount := len(nodeKeys)

log.Info(fmt.Sprintf("[%s] Total values to insert: %d", logPrefix, totalKeysCount))
log.Info(fmt.Sprintf("[%s] Total values to insert: %d", logPrefix, len(nodeKeys)))

log.Info(fmt.Sprintf("[%s] Sorting keys...", logPrefix))
sortStartTime := time.Now()
Expand All @@ -49,6 +46,86 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
sortTotalTime := time.Since(sortStartTime)
log.Info(fmt.Sprintf("[%s] Keys sorted in %v", logPrefix, sortTotalTime))

wg := sync.WaitGroup{}
wg.Add(2)

deletesWorker := utils.NewWorker(ctx, "smt_save_finished", 1<<16)

// start a worker to delete finished parts of the tree and return values to save to the db
go func() {
defer wg.Done()
deletesWorker.DoWork()
}()

var buildSmtLoopErr error
var rootNode *SmtNode
tempTreeBuildStart := time.Now()
leafValueMap := sync.Map{}
accountValuesReadChan := make(chan *utils.NodeValue8, 1024)
go func() {
defer wg.Done()
defer deletesWorker.Stop()
rootNode, buildSmtLoopErr = runBuildSmtLoop(s, logPrefix, nodeKeys, &leafValueMap, deletesWorker, accountValuesReadChan)
}()

// startBuildSmtLoopDbCompanionLoop is blocking operation. It continue only when the last result is saved
if err := startBuildSmtLoopDbCompanionLoop(s, nodeKeys, deletesWorker.GetJobResultsChannel(), accountValuesReadChan); err != nil {
return [4]uint64{}, err
}

// by the time the code goes to here the runBuildSmtLoop as finished and everything has been stored in the db
// => the .Wait here ensure that current thread will see the memory written by deleteWorkers' thread and runBuildSmtLoop's thread
wg.Wait()

if buildSmtLoopErr != nil {
return [4]uint64{}, buildSmtLoopErr
}

tempTreeBuildTime := time.Since(tempTreeBuildStart)
log.Info(fmt.Sprintf("[%s] Finished the temp tree build in %v, hashing and saving the result...", logPrefix, tempTreeBuildTime))

//special case where no values were inserted
if rootNode.isLeaf() {
return [4]uint64{}, nil
}

//if the root node has only one branch, that branch should become the root node
var pathToDeleteFrom []int
if len(nodeKeys) == 1 {
if rootNode.node1 == nil {
rootNode = rootNode.node0
pathToDeleteFrom = append(pathToDeleteFrom, 0)
} else if rootNode.node0 == nil && utils.IsArrayUint64Empty(rootNode.leftHash[:]) {
rootNode = rootNode.node1
pathToDeleteFrom = append(pathToDeleteFrom, 1)
}
}

//if the branch is a leaf, the rkey is the whole key
if rootNode.isLeaf() {
newRkey := []int{pathToDeleteFrom[0]}
pathToDeleteFrom = []int{}
newRkey = append(newRkey, rootNode.rKey...)
rootNode.rKey = newRkey
}

finalRoot, err := rootNode.deleteTree(pathToDeleteFrom, s, &leafValueMap)
if err != nil {
return [4]uint64{}, err
}

if err := s.setLastRoot(finalRoot); err != nil {
return [4]uint64{}, err
}

return finalRoot, nil
}

func runBuildSmtLoop(s *SMT, logPrefix string, nodeKeys []utils.NodeKey, leafValueMap *sync.Map, deletesWorker *utils.Worker, accountValuesReadChan <-chan *utils.NodeValue8) (*SmtNode, error) {
totalKeysCount := len(nodeKeys)
insertedKeysCount := uint64(0)
maxReachedLevel := 0

rootNode := SmtNode{
leftHash: [4]uint64{},
node0: nil,
Expand All @@ -60,32 +137,15 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
defer stopProgressPrinter()
progressChan <- uint64(totalKeysCount)

insertedKeysCount := uint64(0)

maxReachedLevel := 0

deletesWorker := utils.NewWorker(ctx, "smt_save_finished", 1000)

// start a worker to delete finished parts of the tree and return values to save to the db
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
deletesWorker.DoWork()
wg.Done()
}()

tempTreeBuildStart := time.Now()
leafValueMap := sync.Map{}

var err error
for _, k := range nodeKeys {
// split the key
keys := k.GetPath()
v, err := s.Db.GetAccountValue(k)
if err != nil {
return [4]uint64{}, err
vPointer := <-accountValuesReadChan
if vPointer == nil {
return nil, fmt.Errorf("the actual error is returned by main DB thread")
}
leafValueMap.Store(k, v)
v := *vPointer
leafValueMap.Store(k, &v)

// find last node
siblings, level := rootNode.findLastNode(keys)
Expand Down Expand Up @@ -136,7 +196,7 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
//sanity check - new leaf should be on the right side
//otherwise something went wrong
if leaf0.rKey[level2] != 0 || keys[level2+level] != 1 {
return [4]uint64{}, fmt.Errorf(
return nil, fmt.Errorf(
"leaf insert error. new leaf should be on the right of the old, oldLeaf: %v, newLeaf: %v",
append(keys[:level+1], leaf0.rKey[level2:]...),
keys,
Expand All @@ -159,7 +219,7 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
jobResult := utils.NewCalcAndPrepareJobResult(s.Db)
//hash, save and delete left leaf
deleteFunc := func() utils.JobResult {
leftHash, err := nodeToDelFrom.node0.deleteTreeNoSave(pathToDeleteFrom, &leafValueMap, jobResult.KvMap, jobResult.LeafsKvMap)
leftHash, err := nodeToDelFrom.node0.deleteTreeNoSave(pathToDeleteFrom, leafValueMap, jobResult.KvMap, jobResult.LeafsKvMap)
if err != nil {
jobResult.Err = err
return jobResult
Expand Down Expand Up @@ -198,7 +258,7 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
// this is case for 1 leaf inserted to the left of the root node
if len(siblings) == 0 && keys[0] == 0 {
if upperNode.node0 != nil {
return [4]uint64{}, fmt.Errorf("tried to override left node")
return nil, fmt.Errorf("tried to override left node")
}
upperNode.node0 = newNode
} else {
Expand All @@ -207,7 +267,7 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
//the new leaf should be on the right side
//otherwise something went wrong
if upperNode.node1 != nil || keys[level] != 1 {
return [4]uint64{}, fmt.Errorf(
return nil, fmt.Errorf(
"leaf insert error. new should be on the right of the found node, foundNode: %v, newLeafKey: %v",
upperNode.node1,
keys,
Expand All @@ -228,7 +288,7 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
// get all leaf keys so we can then get all needed values and pass them
// this is needed because w can't read from the db in another routine
deleteFunc := func() utils.JobResult {
leftHash, err := nodeToDelFrom.node0.deleteTreeNoSave(pathToDeleteFrom, &leafValueMap, jobResult.KvMap, jobResult.LeafsKvMap)
leftHash, err := nodeToDelFrom.node0.deleteTreeNoSave(pathToDeleteFrom, leafValueMap, jobResult.KvMap, jobResult.LeafsKvMap)

if err != nil {
jobResult.Err = err
Expand All @@ -248,79 +308,47 @@ func (s *SMT) GenerateFromKVBulk(ctx context.Context, logPrefix string, nodeKeys
}
}

if err := runSaveLoop(deletesWorker.GetJobResultsChannel()); err != nil {
return [4]uint64{}, err
}

insertedKeysCount++
progressChan <- uint64(totalKeysCount) + insertedKeysCount
}
deletesWorker.Stop()

wg.Wait()

// wait and save all jobs
if err := runSaveLoop(deletesWorker.GetJobResultsChannel()); err != nil {
return [4]uint64{}, err
}

s.updateDepth(maxReachedLevel)

tempTreeBuildTime := time.Since(tempTreeBuildStart)

log.Info(fmt.Sprintf("[%s] Finished the temp tree build in %v, hashing and saving the result...", logPrefix, tempTreeBuildTime))

//special case where no values were inserted
if rootNode.isLeaf() {
return [4]uint64{}, nil
}

//if the root node has only one branch, that branch should become the root node
var pathToDeleteFrom []int
if len(nodeKeys) == 1 {
if rootNode.node1 == nil {
rootNode = *rootNode.node0
pathToDeleteFrom = append(pathToDeleteFrom, 0)
} else if rootNode.node0 == nil && utils.IsArrayUint64Empty(rootNode.leftHash[:]) {
rootNode = *rootNode.node1
pathToDeleteFrom = append(pathToDeleteFrom, 1)
}
}

//if the branch is a leaf, the rkey is the whole key
if rootNode.isLeaf() {
newRkey := []int{pathToDeleteFrom[0]}
pathToDeleteFrom = []int{}
newRkey = append(newRkey, rootNode.rKey...)
rootNode.rKey = newRkey
}

finalRoot, err := rootNode.deleteTree(pathToDeleteFrom, s, &leafValueMap)
if err != nil {
return [4]uint64{}, err
}

if err := s.setLastRoot(finalRoot); err != nil {
return [4]uint64{}, err
}

return finalRoot, nil
return &rootNode, nil
}

func runSaveLoop(jobResultsChannel chan utils.JobResult) error {
for {
select {
case result := <-jobResultsChannel:
if result.GetError() != nil {
return result.GetError()
}
func startBuildSmtLoopDbCompanionLoop(s *SMT, nodeKeys []utils.NodeKey, jobResultsChannel chan utils.JobResult, accountValuesReadChan chan *utils.NodeValue8) error {
lastReadAccountValueIndex := 0
totalKeys := len(nodeKeys)

if err := result.Save(); err != nil {
for {
accountValuesReadChanSize := len(accountValuesReadChan)
readSize := 1024 - accountValuesReadChanSize
readLimit := lastReadAccountValueIndex + readSize
if readLimit > totalKeys {
readLimit = totalKeys
}
for ; lastReadAccountValueIndex < readLimit; lastReadAccountValueIndex++ {
v, err := s.Db.GetAccountValue(nodeKeys[lastReadAccountValueIndex])
if err != nil {
accountValuesReadChan <- nil
return err
}
default:
accountValuesReadChan <- &v
}

result, ok := <-jobResultsChannel
if !ok {
return nil
}

if result.GetError() != nil {
return result.GetError()
}

if err := result.Save(); err != nil {
return err
}
}
}

Expand Down Expand Up @@ -381,11 +409,11 @@ func (n *SmtNode) deleteTreeNoSave(keyPath []int, leafValueMap *sync.Map, kvMapO
if !ok {
return [4]uint64{}, fmt.Errorf("value not found for key %v", k)
}
accoutnValue := v.(utils.NodeValue8)
accoutnValue := v.(*utils.NodeValue8)

newKey := utils.RemoveKeyBits(k, len(keyPath))
//hash and save leaf
newValH, newValHV, newLeafHash, newLeafHashV := createNewLeafNoSave(&newKey, &accoutnValue)
newValH, newValHV, newLeafHash, newLeafHashV := createNewLeafNoSave(&newKey, accoutnValue)
kvMapOfValuesToSave[*newValH] = *newValHV
kvMapOfValuesToSave[*newLeafHash] = *newLeafHashV
kvMapOfLeafValuesToSave[*newLeafHash] = k
Expand Down
20 changes: 9 additions & 11 deletions smt/pkg/utils/job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package utils
import (
"context"
"sync/atomic"
"time"
)

type DB interface {
Expand Down Expand Up @@ -78,25 +77,24 @@ func (w *Worker) GetJobResultsChannel() chan JobResult {
}

func (w *Worker) Stop() {
w.stopped.Store(true)
close(w.jobs)
}

// DoWork processes jobs from the queue (jobs channel).
func (w *Worker) DoWork() {
LOOP:
defer close(w.jobResults)

for {
select {
case <-w.ctx.Done():
break LOOP
// if job received.
case job := <-w.jobs:
return
case job, ok := <-w.jobs:
if !ok {
return
}

jobRes := job()
w.jobResults <- jobRes
default:
if w.stopped.Load() {
break LOOP
}
time.Sleep(1 * time.Millisecond)
}
}
}

0 comments on commit f413fd1

Please sign in to comment.