Skip to content

Commit

Permalink
fix deadlock when initializing persistent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Świątek committed Mar 21, 2023
1 parent 2399471 commit 209d3c9
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
11 changes: 11 additions & 0 deletions .chloggen/fix_persistentstorage_deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix a deadlock in persistent queue initialization

# One or more tracking issues or pull requests related to the change
issues: [7400]
15 changes: 7 additions & 8 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
// The queue needs to be initialized separately using initPersistentContiguousStorage.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
Expand All @@ -107,18 +106,18 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac
initPersistentContiguousStorage(ctx, pcs)
notDispatchedReqs := pcs.retrieveNotDispatchedReqs(context.Background())

// We start the loop first so in case there are more elements in the persistent storage than the capacity,
// it does not get blocked on initialization

go pcs.loop()

// Make sure the leftover requests are handled
pcs.enqueueNotDispatchedReqs(notDispatchedReqs)
// Make sure the communication channel is loaded up
for i := uint64(0); i < pcs.size(); i++ {

// Ensure the communication channel has the same size as the queue
// We might already have items here from requeueing non-dispatched requests
for len(pcs.putChan) < int(pcs.size()) {
pcs.putChan <- struct{}{}
}

// start the loop which moves items from storage to the outbound channel
go pcs.loop()

return pcs
}

Expand Down
41 changes: 41 additions & 0 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,47 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
}
}

// this test attempts to check if all the invariants are kept if the queue is recreated while
// close to full and with some items dispatched
func TestPersistentStorage_StartWithNonDispatched(t *testing.T) {
var capacity uint64 = 5 // arbitrary small number
path := t.TempDir()
logger := zap.NewNop()

traces := newTraces(5, 10)
req := newFakeTracesRequest(traces)

ext := createStorageExtension(path)
client := createTestClient(ext)
ps := createTestPersistentStorageWithLoggingAndCapacity(client, logger, capacity)

// Put in items up to capacity
for i := 0; i < int(capacity); i++ {
err := ps.put(req)
require.NoError(t, err)
}

// get one item out, but don't mark it as processed
<-ps.get()
// put one more item in
err := ps.put(req)
require.NoError(t, err)

require.Eventually(t, func() bool {
return ps.size() == capacity-1
}, 5*time.Second, 10*time.Millisecond)
ps.stop()

// Reload
newPs := createTestPersistentStorageWithLoggingAndCapacity(client, logger, capacity)

require.Eventually(t, func() bool {
newPs.mu.Lock()
defer newPs.mu.Unlock()
return newPs.size() == capacity-1 && len(newPs.currentlyDispatchedItems) == 1
}, 5*time.Second, 10*time.Millisecond)
}

func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) {
path := t.TempDir()

Expand Down

0 comments on commit 209d3c9

Please sign in to comment.