From 66e07ab15fad6095d2540e505d6ef9dc0ceee067 Mon Sep 17 00:00:00 2001 From: Urvi Date: Fri, 9 Jun 2023 08:09:11 -0700 Subject: [PATCH 1/3] 4902 Add mutex for concurrent access in GetLatestLedgerSequence --- ingest/ledgerbackend/captive_core_backend.go | 18 ++++++ .../captive_core_backend_test.go | 64 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 62b848106a..4acbb9f1bf 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -97,6 +97,12 @@ type CaptiveStellarCore struct { // cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger(). cachedMeta *xdr.LedgerCloseMeta + // ledgerSequenceLock mutex is used to protect the member variables used in the + // read-only GetLatestLedgerSequence method from concurrent write operations. + // This is required when GetLatestLedgerSequence is called from other goroutine + // such as writing Prometheus metric captive_stellar_core_latest_ledger. + ledgerSequenceLock sync.RWMutex + prepared *Range // non-nil if any range is prepared closed bool // False until the core is closed nextLedger uint32 // next ledger expected, error w/ restart if not seen @@ -307,9 +313,12 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error // The next ledger should be the first ledger of the checkpoint containing // the requested ledger ran := BoundedRange(from, to) + c.ledgerSequenceLock.Lock() c.prepared = &ran c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from) c.lastLedger = &to + c.ledgerSequenceLock.Unlock() + c.previousLedgerHash = nil return nil @@ -330,10 +339,13 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro // In the online mode we update nextLedger after streaming the first ledger. // This is to support versions before and after/including v17.1.0 that // introduced minimal persistent DB. + c.ledgerSequenceLock.Lock() c.nextLedger = 0 ran := UnboundedRange(from) c.prepared = &ran c.lastLedger = nil + c.ledgerSequenceLock.Unlock() + c.previousLedgerHash = nil return nil @@ -647,7 +659,10 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe ) } + c.ledgerSequenceLock.Lock() c.nextLedger = result.LedgerSequence() + 1 + c.ledgerSequenceLock.Unlock() + currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString() c.previousLedgerHash = ¤tLedgerHash @@ -708,6 +723,9 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3 c.stellarCoreLock.RLock() defer c.stellarCoreLock.RUnlock() + c.ledgerSequenceLock.RLock() + defer c.ledgerSequenceLock.RUnlock() + if c.closed { return 0, errors.New("stellar-core is no longer usable") } diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 709551f810..4574a2e1a0 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "sync" "github.com/stellar/go/historyarchive" "github.com/stellar/go/network" @@ -645,6 +646,69 @@ func TestGetLatestLedgerSequence(t *testing.T) { mockRunner.AssertExpectations(t) } +func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { + var fromSeq uint32 = 64 + var toSeq uint32 = 400 + metaChan := make(chan metaResult, toSeq) + + for i := fromSeq; i <= toSeq; i++ { + meta := buildLedgerCloseMeta(testLedgerHeader{sequence: uint32(i)}) + metaChan <- metaResult{ + LedgerCloseMeta: &meta, + } + } + ctx, cancel := context.WithCancel(context.Background()) + mockRunner := &stellarCoreRunnerMock{} + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("context").Return(ctx) + mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil) + + mockArchive := &historyarchive.MockArchive{} + mockArchive. + On("GetRootHAS"). + Return(historyarchive.HistoryArchiveState{ + CurrentLedger: uint32(toSeq * 2), + }, nil) + + mockArchive. + On("GetLedgerHeader", mock.Anything). + Return(xdr.LedgerHeaderHistoryEntry{}, nil) + + captiveBackend := CaptiveStellarCore{ + archive: mockArchive, + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner + }, + checkpointManager: historyarchive.NewCheckpointManager(10), + } + + ledgerRange := UnboundedRange(fromSeq) + captiveBackend.PrepareRange(ctx, ledgerRange) + + var wg sync.WaitGroup + wg.Add(1) + + go func(ctx context.Context) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + captiveBackend.GetLatestLedgerSequence(ctx) + } + } + }(ctx) + + for i := fromSeq; i < toSeq; i++ { + captiveBackend.GetLedger(ctx, uint32(i)) + } + + cancel() + + wg.Wait() +} + func TestCaptiveGetLedger(t *testing.T) { tt := assert.New(t) metaChan := make(chan metaResult, 300) From 3ed7f10e9cf7b555395fdc86989f041fd307a7e0 Mon Sep 17 00:00:00 2001 From: Urvi Date: Fri, 9 Jun 2023 12:39:42 -0700 Subject: [PATCH 2/3] Fix linter errors --- ingest/ledgerbackend/captive_core_backend_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 4574a2e1a0..df8ad1ce59 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -5,12 +5,12 @@ import ( "encoding/hex" "fmt" "os" + "sync" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "sync" "github.com/stellar/go/historyarchive" "github.com/stellar/go/network" @@ -652,7 +652,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { metaChan := make(chan metaResult, toSeq) for i := fromSeq; i <= toSeq; i++ { - meta := buildLedgerCloseMeta(testLedgerHeader{sequence: uint32(i)}) + meta := buildLedgerCloseMeta(testLedgerHeader{sequence: i}) metaChan <- metaResult{ LedgerCloseMeta: &meta, } @@ -667,7 +667,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { mockArchive. On("GetRootHAS"). Return(historyarchive.HistoryArchiveState{ - CurrentLedger: uint32(toSeq * 2), + CurrentLedger: toSeq * 2, }, nil) mockArchive. @@ -683,7 +683,8 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { } ledgerRange := UnboundedRange(fromSeq) - captiveBackend.PrepareRange(ctx, ledgerRange) + err := captiveBackend.PrepareRange(ctx, ledgerRange) + assert.NoError(t, err) var wg sync.WaitGroup wg.Add(1) @@ -695,13 +696,14 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { case <-ctx.Done(): return default: - captiveBackend.GetLatestLedgerSequence(ctx) + _, _ = captiveBackend.GetLatestLedgerSequence(ctx) } } }(ctx) for i := fromSeq; i < toSeq; i++ { - captiveBackend.GetLedger(ctx, uint32(i)) + _, err = captiveBackend.GetLedger(ctx, i) + assert.NoError(t, err) } cancel() From 7a2bb2eb56684f2bda860c3bc4b1202e92ce04ae Mon Sep 17 00:00:00 2001 From: Urvi Date: Fri, 9 Jun 2023 14:44:13 -0700 Subject: [PATCH 3/3] Addressing review comments --- ingest/ledgerbackend/captive_core_backend.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 4acbb9f1bf..a0f7c99821 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -314,11 +314,11 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error // the requested ledger ran := BoundedRange(from, to) c.ledgerSequenceLock.Lock() + defer c.ledgerSequenceLock.Unlock() + c.prepared = &ran c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from) c.lastLedger = &to - c.ledgerSequenceLock.Unlock() - c.previousLedgerHash = nil return nil @@ -340,12 +340,12 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro // This is to support versions before and after/including v17.1.0 that // introduced minimal persistent DB. c.ledgerSequenceLock.Lock() + defer c.ledgerSequenceLock.Unlock() + c.nextLedger = 0 ran := UnboundedRange(from) c.prepared = &ran c.lastLedger = nil - c.ledgerSequenceLock.Unlock() - c.previousLedgerHash = nil return nil