diff --git a/ledger/archival_test.go b/ledger/archival_test.go index 87e2f42eda..c7f00b7d6a 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -29,8 +29,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/algorand/go-deadlock" - "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -192,13 +190,6 @@ func TestArchivalRestart(t *testing.T) { // Start in archival mode, add 2K blocks, restart, ensure all blocks are there - // disable deadlock checking code - deadlockDisable := deadlock.Opts.Disable - deadlock.Opts.Disable = true - defer func() { - deadlock.Opts.Disable = deadlockDisable - }() - dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) dbPrefix := filepath.Join(t.TempDir(), dbName) @@ -339,13 +330,6 @@ func TestArchivalCreatables(t *testing.T) { // restart, ensure all assets are there in index unless they were // deleted - // disable deadlock checking code - deadlockDisable := deadlock.Opts.Disable - deadlock.Opts.Disable = true - defer func() { - deadlock.Opts.Disable = deadlockDisable - }() - dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) dbPrefix := filepath.Join(t.TempDir(), dbName) @@ -691,11 +675,6 @@ func TestArchivalFromNonArchival(t *testing.T) { partitiontest.PartitionTest(t) // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there - deadlockDisable := deadlock.Opts.Disable - deadlock.Opts.Disable = true - defer func() { - deadlock.Opts.Disable = deadlockDisable - }() dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) dbPrefix := filepath.Join(t.TempDir(), dbName) diff --git a/ledger/blockqueue.go b/ledger/blockqueue.go index 50d4dc44df..a72c1d8cba 100644 --- a/ledger/blockqueue.go +++ b/ledger/blockqueue.go @@ -52,10 +52,29 @@ type blockQueue struct { closed chan struct{} } -func bqInit(l *Ledger) (*blockQueue, error) { +func newBlockQueue(l *Ledger) (*blockQueue, error) { bq := &blockQueue{} bq.cond = sync.NewCond(&bq.mu) bq.l = l + return bq, nil +} + +func (bq *blockQueue) start() error { + bq.mu.Lock() + defer bq.mu.Unlock() + + if bq.running { + // this should be harmless, but it should also be impossible + bq.l.log.Warn("blockQueue.start() already started") + return nil + } + if bq.closed != nil { + // a previus close() is still waiting on a previous syncer() to finish + oldsyncer := bq.closed + bq.mu.Unlock() + <-oldsyncer + bq.mu.Lock() + } bq.running = true bq.closed = make(chan struct{}) ledgerBlockqInitCount.Inc(nil) @@ -67,33 +86,32 @@ func bqInit(l *Ledger) (*blockQueue, error) { }) ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil) if err != nil { - return nil, err + return err } go bq.syncer() - return bq, nil + return nil } -func (bq *blockQueue) close() { +func (bq *blockQueue) stop() { bq.mu.Lock() - defer func() { - bq.mu.Unlock() - // we want to block here until the sync go routine is done. - // it's not (just) for the sake of a complete cleanup, but rather - // to ensure that the sync goroutine isn't busy in a notifyCommit - // call which might be blocked inside one of the trackers. - <-bq.closed - }() - + closechan := bq.closed if bq.running { bq.running = false bq.cond.Broadcast() } - + bq.mu.Unlock() + + // we want to block here until the sync go routine is done. + // it's not (just) for the sake of a complete cleanup, but rather + // to ensure that the sync goroutine isn't busy in a notifyCommit + // call which might be blocked inside one of the trackers. + if closechan != nil { + <-closechan + } } func (bq *blockQueue) syncer() { - defer close(bq.closed) bq.mu.Lock() for { for bq.running && len(bq.q) == 0 { @@ -101,6 +119,8 @@ func (bq *blockQueue) syncer() { } if !bq.running { + close(bq.closed) + bq.closed = nil bq.mu.Unlock() return } diff --git a/ledger/double_test.go b/ledger/double_test.go index bb28af8fa8..9b4ca20f11 100644 --- a/ledger/double_test.go +++ b/ledger/double_test.go @@ -152,8 +152,8 @@ func (dl *DoubleLedger) fundedApp(sender basics.Address, amount uint64, source s } func (dl *DoubleLedger) reloadLedgers() { - require.NoError(dl.t, dl.generator.ReloadLedger()) - require.NoError(dl.t, dl.validator.ReloadLedger()) + require.NoError(dl.t, dl.generator.reloadLedger()) + require.NoError(dl.t, dl.validator.reloadLedger()) } func checkBlock(t *testing.T, checkLedger *Ledger, vb *ledgercore.ValidatedBlock) { diff --git a/ledger/ledger.go b/ledger/ledger.go index 30004f47a2..cd023e5023 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -160,6 +160,11 @@ func OpenLedger( l.genesisAccounts = make(map[basics.Address]basics.AccountData) } + l.blockQ, err = newBlockQueue(l) + if err != nil { + return nil, err + } + err = l.reloadLedger() if err != nil { return nil, err @@ -168,19 +173,12 @@ func OpenLedger( return l, nil } -// ReloadLedger is exported for the benefit of tests in the internal -// package. Revisit this when we rename / restructure that thing -func (l *Ledger) ReloadLedger() error { - return l.reloadLedger() -} - func (l *Ledger) reloadLedger() error { // similar to the Close function, we want to start by closing the blockQ first. The // blockQ is having a sync goroutine which indirectly calls other trackers. We want to eliminate that go-routine first, // and follow up by taking the trackers lock. if l.blockQ != nil { - l.blockQ.close() - l.blockQ = nil + l.blockQ.stop() } // take the trackers lock. This would ensure that no other goroutine is using the trackers. @@ -192,9 +190,9 @@ func (l *Ledger) reloadLedger() error { // init block queue var err error - l.blockQ, err = bqInit(l) + err = l.blockQ.start() if err != nil { - err = fmt.Errorf("reloadLedger.bqInit %v", err) + err = fmt.Errorf("reloadLedger.blockQ.start %v", err) return err } @@ -381,8 +379,7 @@ func (l *Ledger) Close() { // we shut the the blockqueue first, since it's sync goroutine dispatches calls // back to the trackers. if l.blockQ != nil { - l.blockQ.close() - l.blockQ = nil + l.blockQ.stop() } // take the trackers lock. This would ensure that no other goroutine is using the trackers. diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index ffa90b3da4..b9a1c64037 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -1557,7 +1557,7 @@ func TestWaitLedgerReload(t *testing.T) { waitRound := l.Latest() + 1 waitChannel := l.Wait(waitRound) - err = l.ReloadLedger() + err = l.reloadLedger() a.NoError(err) triggerTrackerFlush(t, l, genesisInitState)