Skip to content

Commit

Permalink
Completable/Single Processors more robust top of stack removal (#2116)
Browse files Browse the repository at this point in the history
Motivation:
Completable/Single Processors make a best effort to remove
nodes from the underlying data structure upon cancellation.
However if the node is the last added and there is concurrent
cancellation we may may fail to remove the node and give up
leading to increased memory consumption.

Modifications:
- `ClosableConcurrentStack` should iterate the stack to remove
  the root node if the first removal attempt fails.

Result:
More robust cleanup of memory on removal.
  • Loading branch information
Scottmitch committed Mar 1, 2022
1 parent 096d4d2 commit 805a73e
Showing 1 changed file with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ boolean relaxedRemove(T item) {
// Nodes maybe "resurrected" and if links are nulled the stack may lose references to Nodes after
// the removal point. Just let the GC reclaim the Node when it is ready because it is no longer
// referenced from top, or if "resurrected" the "relaxed" contract of this method allows it.
} else {
topUpdater.compareAndSet(this, curr, curr.next);
} else if (!topUpdater.compareAndSet(this, curr, curr.next)) {
removeNode(curr);
}
return true;
} else {
Expand All @@ -94,6 +94,39 @@ boolean relaxedRemove(T item) {
return false;
}

private void removeNode(final Node<T> nodeToRemove) {
for (;;) {
final Object rawCurrTop = top;
if (rawCurrTop == null || !Node.class.equals(rawCurrTop.getClass())) {
break;
}
boolean failedTopRemoval = false;
@SuppressWarnings("unchecked")
Node<T> curr = (Node<T>) rawCurrTop;
Node<T> prev = null;
do {
if (curr == nodeToRemove) {
if (prev == null) {
failedTopRemoval = !topUpdater.compareAndSet(this, curr, curr.next);
break;
} else {
prev.next = curr.next;
return;
}
}
prev = curr;
curr = curr.next;
} while (curr != null);

// If we attempted and failed to remove the top node, try again. Otherwise, if we removed the top node or if
// we didn't find the node in the stack then we are done and can break (another thread may have removed the
// same node concurrently).
if (!failedTopRemoval) {
break;
}
}
}

/**
* Clear the stack contents, and prevent future {@link #push(Object)} operations from adding to this stack.
* {@code closer} will be invoked for each element currently in the stack.
Expand Down

0 comments on commit 805a73e

Please sign in to comment.