Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] fix deadlock when initializing persistent queue #7400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/fix_persistentstorage_deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix a deadlock in persistent queue initialization

# One or more tracking issues or pull requests related to the change
issues: [7400]
15 changes: 7 additions & 8 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
// The queue needs to be initialized separately using initPersistentContiguousStorage.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
Expand All @@ -107,18 +106,18 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac
initPersistentContiguousStorage(ctx, pcs)
notDispatchedReqs := pcs.retrieveNotDispatchedReqs(context.Background())

// We start the loop first so in case there are more elements in the persistent storage than the capacity,
// it does not get blocked on initialization

go pcs.loop()

// Make sure the leftover requests are handled
pcs.enqueueNotDispatchedReqs(notDispatchedReqs)
// Make sure the communication channel is loaded up
for i := uint64(0); i < pcs.size(); i++ {

// Ensure the communication channel has the same size as the queue
// We might already have items here from requeueing non-dispatched requests
for len(pcs.putChan) < int(pcs.size()) {
pcs.putChan <- struct{}{}
}

// start the loop which moves items from storage to the outbound channel
go pcs.loop()

return pcs
}

Expand Down
41 changes: 41 additions & 0 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,47 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
}
}

// this test attempts to check if all the invariants are kept if the queue is recreated while
// close to full and with some items dispatched
func TestPersistentStorage_StartWithNonDispatched(t *testing.T) {
var capacity uint64 = 5 // arbitrary small number
path := t.TempDir()
logger := zap.NewNop()

traces := newTraces(5, 10)
req := newFakeTracesRequest(traces)

ext := createStorageExtension(path)
client := createTestClient(ext)
ps := createTestPersistentStorageWithLoggingAndCapacity(client, logger, capacity)

// Put in items up to capacity
for i := 0; i < int(capacity); i++ {
err := ps.put(req)
require.NoError(t, err)
}

// get one item out, but don't mark it as processed
<-ps.get()
// put one more item in
err := ps.put(req)
require.NoError(t, err)

require.Eventually(t, func() bool {
return ps.size() == capacity-1
}, 5*time.Second, 10*time.Millisecond)
ps.stop()

// Reload
newPs := createTestPersistentStorageWithLoggingAndCapacity(client, logger, capacity)

require.Eventually(t, func() bool {
newPs.mu.Lock()
defer newPs.mu.Unlock()
return newPs.size() == capacity-1 && len(newPs.currentlyDispatchedItems) == 1
}, 5*time.Second, 10*time.Millisecond)
}

func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) {
path := t.TempDir()

Expand Down