Skip to content

Commit

Permalink
e3: recon deadlock fix (#7186)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Mar 27, 2023
1 parent 628f52d commit be860e3
Showing 1 changed file with 92 additions and 84 deletions.
176 changes: 92 additions & 84 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,86 +1024,94 @@ func reconstituteStep(last bool,
return h
}

var err error // avoid declare global mutable variable
for bn := startBlockNum; bn <= endBlockNum; bn++ {
t = time.Now()
b, err = blockWithSenders(chainDb, nil, blockReader, bn)
if err != nil {
return err
}
if b == nil {
return fmt.Errorf("could not find block %d\n", bn)
}
txs := b.Transactions()
header := b.HeaderNoCopy()
skipAnalysis := core.SkipAnalysis(chainConfig, bn)
signer := *types.MakeSigner(chainConfig, bn)

f := core.GetHashFn(header, getHeaderFunc)
getHashFnMute := &sync.Mutex{}
getHashFn := func(n uint64) common.Hash {
getHashFnMute.Lock()
defer getHashFnMute.Unlock()
return f(n)
}
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */)
rules := chainConfig.Rules(bn, b.Time())
if err := func() (err error) {
defer func() {
close(workCh)
reconDone <- struct{}{} // Complete logging and committing go-routine
_ = g.Wait()
}()

for txIndex := -1; txIndex <= len(txs); txIndex++ {
if bitmap.Contains(inputTxNum) {
binary.BigEndian.PutUint64(txKey[:], inputTxNum)
txTask := &exec22.TxTask{
BlockNum: bn,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
TxNum: inputTxNum,
Txs: txs,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
}
if txIndex >= 0 && txIndex < len(txs) {
txTask.Tx = txs[txIndex]
txTask.TxAsMessage, err = txTask.Tx.AsMessage(signer, header.BaseFee, txTask.Rules)
if err != nil {
return err
for bn := startBlockNum; bn <= endBlockNum; bn++ {
t = time.Now()
b, err = blockWithSenders(chainDb, nil, blockReader, bn)
if err != nil {
return err
}
if b == nil {
return fmt.Errorf("could not find block %d\n", bn)
}
txs := b.Transactions()
header := b.HeaderNoCopy()
skipAnalysis := core.SkipAnalysis(chainConfig, bn)
signer := *types.MakeSigner(chainConfig, bn)

f := core.GetHashFn(header, getHeaderFunc)
getHashFnMute := &sync.Mutex{}
getHashFn := func(n uint64) common.Hash {
getHashFnMute.Lock()
defer getHashFnMute.Unlock()
return f(n)
}
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */)
rules := chainConfig.Rules(bn, b.Time())

for txIndex := -1; txIndex <= len(txs); txIndex++ {
if bitmap.Contains(inputTxNum) {
binary.BigEndian.PutUint64(txKey[:], inputTxNum)
txTask := &exec22.TxTask{
BlockNum: bn,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
TxNum: inputTxNum,
Txs: txs,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
}
if txIndex >= 0 && txIndex < len(txs) {
txTask.Tx = txs[txIndex]
txTask.TxAsMessage, err = txTask.Tx.AsMessage(signer, header.BaseFee, txTask.Rules)
if err != nil {
return err
}
if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender
}
} else {
txTask.Txs = txs
}
if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender

select {
case workCh <- txTask:
case <-ctx.Done():
return ctx.Err()
}
} else {
txTask.Txs = txs
}
workCh <- txTask
inputTxNum++
}
inputTxNum++
}

core.BlockExecutionTimer.UpdateDuration(t)
syncMetrics[stages.Execution].Set(bn)
select {
case <-ctx.Done():
return ctx.Err()
default:
core.BlockExecutionTimer.UpdateDuration(t)
syncMetrics[stages.Execution].Set(bn)
}
}
close(workCh)
reconDone <- struct{}{} // Complete logging and committing go-routine
if err := g.Wait(); err != nil {
if err := g.Wait(); err != nil {
return err
}
return nil
}(); err != nil {
return err
}

for i := 0; i < workerCount; i++ {
roTxs[i].Rollback()
}
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err = rs.Flush(tx); err != nil {
if err := rs.Flush(tx); err != nil {
return err
}
return nil
Expand All @@ -1119,10 +1127,10 @@ func reconstituteStep(last bool,
defer plainContractCollector.Close()
var transposedKey []byte

if err = db.View(ctx, func(roTx kv.Tx) error {
if err := db.View(ctx, func(roTx kv.Tx) error {
clear := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateR, nil, math.MaxUint32)
defer clear()
if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainStateCollector.Collect(transposedKey, v)
Expand All @@ -1131,7 +1139,7 @@ func reconstituteStep(last bool,
}
clear2 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainStateD, nil, math.MaxUint32)
defer clear2()
if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return plainStateCollector.Collect(transposedKey, nil)
Expand All @@ -1140,7 +1148,7 @@ func reconstituteStep(last bool,
}
clear3 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeR, nil, math.MaxUint32)
defer clear3()
if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return codeCollector.Collect(transposedKey, v)
Expand All @@ -1149,7 +1157,7 @@ func reconstituteStep(last bool,
}
clear4 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.CodeD, nil, math.MaxUint32)
defer clear4()
if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return codeCollector.Collect(transposedKey, nil)
Expand All @@ -1158,7 +1166,7 @@ func reconstituteStep(last bool,
}
clear5 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractR, nil, math.MaxUint32)
defer clear5()
if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainContractCollector.Collect(transposedKey, v)
Expand All @@ -1167,7 +1175,7 @@ func reconstituteStep(last bool,
}
clear6 := kv.ReadAhead(ctx, db, &atomic.Bool{}, kv.PlainContractD, nil, math.MaxUint32)
defer clear6()
if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
if err := roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return plainContractCollector.Collect(transposedKey, nil)
Expand All @@ -1178,33 +1186,33 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
if err = db.Update(ctx, func(tx kv.RwTx) error {
if err = tx.ClearBucket(kv.PlainStateR); err != nil {
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err := tx.ClearBucket(kv.PlainStateR); err != nil {
return err
}
if err = tx.ClearBucket(kv.PlainStateD); err != nil {
if err := tx.ClearBucket(kv.PlainStateD); err != nil {
return err
}
if err = tx.ClearBucket(kv.CodeR); err != nil {
if err := tx.ClearBucket(kv.CodeR); err != nil {
return err
}
if err = tx.ClearBucket(kv.CodeD); err != nil {
if err := tx.ClearBucket(kv.CodeD); err != nil {
return err
}
if err = tx.ClearBucket(kv.PlainContractR); err != nil {
if err := tx.ClearBucket(kv.PlainContractR); err != nil {
return err
}
if err = tx.ClearBucket(kv.PlainContractD); err != nil {
if err := tx.ClearBucket(kv.PlainContractD); err != nil {
return err
}
return nil
}); err != nil {
return err
}
if err = chainDb.Update(ctx, func(tx kv.RwTx) error {
if err := chainDb.Update(ctx, func(tx kv.RwTx) error {
var lastKey []byte
var lastVal []byte
if err = plainStateCollector.Load(tx, kv.PlainState, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := plainStateCollector.Load(tx, kv.PlainState, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if !bytes.Equal(k[:len(k)-8], lastKey) {
if lastKey != nil {
if e := next(lastKey, lastKey, lastVal); e != nil {
Expand Down Expand Up @@ -1236,7 +1244,7 @@ func reconstituteStep(last bool,
}
lastKey = nil
lastVal = nil
if err = codeCollector.Load(tx, kv.Code, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := codeCollector.Load(tx, kv.Code, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if !bytes.Equal(k[:len(k)-8], lastKey) {
if lastKey != nil {
if e := next(lastKey, lastKey, lastVal); e != nil {
Expand Down Expand Up @@ -1268,7 +1276,7 @@ func reconstituteStep(last bool,
}
lastKey = nil
lastVal = nil
if err = plainContractCollector.Load(tx, kv.PlainContractCode, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err := plainContractCollector.Load(tx, kv.PlainContractCode, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if !bytes.Equal(k[:len(k)-8], lastKey) {
if lastKey != nil {
if e := next(lastKey, lastKey, lastVal); e != nil {
Expand Down

0 comments on commit be860e3

Please sign in to comment.