Skip to content

Commit

Permalink
ledger: rearrange blockqueue start/stop (#4964)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson authored Jan 19, 2023
1 parent 5dd909c commit 992b318
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 51 deletions.
21 changes: 0 additions & 21 deletions ledger/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -192,13 +190,6 @@ func TestArchivalRestart(t *testing.T) {

// Start in archival mode, add 2K blocks, restart, ensure all blocks are there

// disable deadlock checking code
deadlockDisable := deadlock.Opts.Disable
deadlock.Opts.Disable = true
defer func() {
deadlock.Opts.Disable = deadlockDisable
}()

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(t.TempDir(), dbName)

Expand Down Expand Up @@ -339,13 +330,6 @@ func TestArchivalCreatables(t *testing.T) {
// restart, ensure all assets are there in index unless they were
// deleted

// disable deadlock checking code
deadlockDisable := deadlock.Opts.Disable
deadlock.Opts.Disable = true
defer func() {
deadlock.Opts.Disable = deadlockDisable
}()

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(t.TempDir(), dbName)

Expand Down Expand Up @@ -691,11 +675,6 @@ func TestArchivalFromNonArchival(t *testing.T) {
partitiontest.PartitionTest(t)

// Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there
deadlockDisable := deadlock.Opts.Disable
deadlock.Opts.Disable = true
defer func() {
deadlock.Opts.Disable = deadlockDisable
}()

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
dbPrefix := filepath.Join(t.TempDir(), dbName)
Expand Down
50 changes: 35 additions & 15 deletions ledger/blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,29 @@ type blockQueue struct {
closed chan struct{}
}

func bqInit(l *Ledger) (*blockQueue, error) {
func newBlockQueue(l *Ledger) (*blockQueue, error) {
bq := &blockQueue{}
bq.cond = sync.NewCond(&bq.mu)
bq.l = l
return bq, nil
}

func (bq *blockQueue) start() error {
bq.mu.Lock()
defer bq.mu.Unlock()

if bq.running {
// this should be harmless, but it should also be impossible
bq.l.log.Warn("blockQueue.start() already started")
return nil
}
if bq.closed != nil {
// a previus close() is still waiting on a previous syncer() to finish
oldsyncer := bq.closed
bq.mu.Unlock()
<-oldsyncer
bq.mu.Lock()
}
bq.running = true
bq.closed = make(chan struct{})
ledgerBlockqInitCount.Inc(nil)
Expand All @@ -67,40 +86,41 @@ func bqInit(l *Ledger) (*blockQueue, error) {
})
ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil)
if err != nil {
return nil, err
return err
}

go bq.syncer()
return bq, nil
return nil
}

func (bq *blockQueue) close() {
func (bq *blockQueue) stop() {
bq.mu.Lock()
defer func() {
bq.mu.Unlock()
// we want to block here until the sync go routine is done.
// it's not (just) for the sake of a complete cleanup, but rather
// to ensure that the sync goroutine isn't busy in a notifyCommit
// call which might be blocked inside one of the trackers.
<-bq.closed
}()

closechan := bq.closed
if bq.running {
bq.running = false
bq.cond.Broadcast()
}

bq.mu.Unlock()

// we want to block here until the sync go routine is done.
// it's not (just) for the sake of a complete cleanup, but rather
// to ensure that the sync goroutine isn't busy in a notifyCommit
// call which might be blocked inside one of the trackers.
if closechan != nil {
<-closechan
}
}

func (bq *blockQueue) syncer() {
defer close(bq.closed)
bq.mu.Lock()
for {
for bq.running && len(bq.q) == 0 {
bq.cond.Wait()
}

if !bq.running {
close(bq.closed)
bq.closed = nil
bq.mu.Unlock()
return
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/double_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (dl *DoubleLedger) fundedApp(sender basics.Address, amount uint64, source s
}

func (dl *DoubleLedger) reloadLedgers() {
require.NoError(dl.t, dl.generator.ReloadLedger())
require.NoError(dl.t, dl.validator.ReloadLedger())
require.NoError(dl.t, dl.generator.reloadLedger())
require.NoError(dl.t, dl.validator.reloadLedger())
}

func checkBlock(t *testing.T, checkLedger *Ledger, vb *ledgercore.ValidatedBlock) {
Expand Down
21 changes: 9 additions & 12 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func OpenLedger(
l.genesisAccounts = make(map[basics.Address]basics.AccountData)
}

l.blockQ, err = newBlockQueue(l)
if err != nil {
return nil, err
}

err = l.reloadLedger()
if err != nil {
return nil, err
Expand All @@ -168,19 +173,12 @@ func OpenLedger(
return l, nil
}

// ReloadLedger is exported for the benefit of tests in the internal
// package. Revisit this when we rename / restructure that thing
func (l *Ledger) ReloadLedger() error {
return l.reloadLedger()
}

func (l *Ledger) reloadLedger() error {
// similar to the Close function, we want to start by closing the blockQ first. The
// blockQ is having a sync goroutine which indirectly calls other trackers. We want to eliminate that go-routine first,
// and follow up by taking the trackers lock.
if l.blockQ != nil {
l.blockQ.close()
l.blockQ = nil
l.blockQ.stop()
}

// take the trackers lock. This would ensure that no other goroutine is using the trackers.
Expand All @@ -192,9 +190,9 @@ func (l *Ledger) reloadLedger() error {

// init block queue
var err error
l.blockQ, err = bqInit(l)
err = l.blockQ.start()
if err != nil {
err = fmt.Errorf("reloadLedger.bqInit %v", err)
err = fmt.Errorf("reloadLedger.blockQ.start %v", err)
return err
}

Expand Down Expand Up @@ -381,8 +379,7 @@ func (l *Ledger) Close() {
// we shut the the blockqueue first, since it's sync goroutine dispatches calls
// back to the trackers.
if l.blockQ != nil {
l.blockQ.close()
l.blockQ = nil
l.blockQ.stop()
}

// take the trackers lock. This would ensure that no other goroutine is using the trackers.
Expand Down
2 changes: 1 addition & 1 deletion ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ func TestWaitLedgerReload(t *testing.T) {
waitRound := l.Latest() + 1
waitChannel := l.Wait(waitRound)

err = l.ReloadLedger()
err = l.reloadLedger()
a.NoError(err)
triggerTrackerFlush(t, l, genesisInitState)

Expand Down

0 comments on commit 992b318

Please sign in to comment.