diff --git a/.chloggen/fix_persistentstorage_deadlock.yaml b/.chloggen/fix_persistentstorage_deadlock.yaml new file mode 100755 index 00000000000..6521f9375b5 --- /dev/null +++ b/.chloggen/fix_persistentstorage_deadlock.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix a deadlock in persistent queue initialization + +# One or more tracking issues or pull requests related to the change +issues: [7400] diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 338136cabcd..9657c2e2f41 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -90,7 +90,6 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -// The queue needs to be initialized separately using initPersistentContiguousStorage. func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ logger: logger, @@ -107,18 +106,18 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac initPersistentContiguousStorage(ctx, pcs) notDispatchedReqs := pcs.retrieveNotDispatchedReqs(context.Background()) - // We start the loop first so in case there are more elements in the persistent storage than the capacity, - // it does not get blocked on initialization - - go pcs.loop() - // Make sure the leftover requests are handled pcs.enqueueNotDispatchedReqs(notDispatchedReqs) - // Make sure the communication channel is loaded up - for i := uint64(0); i < pcs.size(); i++ { + + // Ensure the communication channel has the same size as the queue + // We might already have items here from requeueing non-dispatched requests + for len(pcs.putChan) < int(pcs.size()) { pcs.putChan <- struct{}{} } + // start the loop which moves items from storage to the outbound channel + go pcs.loop() + return pcs } diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index d4ff5134404..b7ae05ea1ef 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -272,6 +272,47 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { } } +// this test attempts to check if all the invariants are kept if the queue is recreated while +// close to full and with some items dispatched +func TestPersistentStorage_StartWithNonDispatched(t *testing.T) { + var capacity uint64 = 5 // arbitrary small number + path := t.TempDir() + logger := zap.NewNop() + + traces := newTraces(5, 10) + req := newFakeTracesRequest(traces) + + ext := createStorageExtension(path) + client := createTestClient(ext) + ps := createTestPersistentStorageWithLoggingAndCapacity(client, logger, capacity) + + // Put in items up to capacity + for i := 0; i < int(capacity); i++ { + err := ps.put(req) + require.NoError(t, err) + } + + // get one item out, but don't mark it as processed + <-ps.get() + // put one more item in + err := ps.put(req) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return ps.size() == capacity-1 + }, 5*time.Second, 10*time.Millisecond) + ps.stop() + + // Reload + newPs := createTestPersistentStorageWithLoggingAndCapacity(client, logger, capacity) + + require.Eventually(t, func() bool { + newPs.mu.Lock() + defer newPs.mu.Unlock() + return newPs.size() == capacity-1 && len(newPs.currentlyDispatchedItems) == 1 + }, 5*time.Second, 10*time.Millisecond) +} + func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { path := t.TempDir()