Skip to content

Commit

Permalink
Use optimistic locking where possible in ResponseBodyEmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
lucky8987 authored and simonbasle committed Nov 7, 2024
1 parent 1ced8c3 commit e67f892
Showing 1 changed file with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.springframework.http.MediaType;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class ResponseBodyEmitter {
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8);

/** Store successful completion before the handler is initialized. */
private boolean complete;
private final AtomicBoolean complete = new AtomicBoolean();

/** Store an error before the handler is initialized. */
@Nullable
Expand Down Expand Up @@ -127,7 +128,7 @@ synchronized void initialize(Handler handler) throws IOException {
this.earlySendAttempts.clear();
}

if (this.complete) {
if (this.complete.get()) {
if (this.failure != null) {
this.handler.completeWithError(this.failure);
}
Expand All @@ -142,11 +143,12 @@ synchronized void initialize(Handler handler) throws IOException {
}
}

synchronized void initializeWithError(Throwable ex) {
this.complete = true;
this.failure = ex;
this.earlySendAttempts.clear();
this.errorCallback.accept(ex);
void initializeWithError(Throwable ex) {
if (this.complete.compareAndSet(false, true)) {
this.failure = ex;
this.earlySendAttempts.clear();
this.errorCallback.accept(ex);
}
}

/**
Expand Down Expand Up @@ -184,7 +186,7 @@ public void send(Object object) throws IOException {
* @throws java.lang.IllegalStateException wraps any other errors
*/
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
if (this.handler != null) {
try {
Expand Down Expand Up @@ -212,7 +214,7 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
* @since 6.0.12
*/
public synchronized void send(Set<DataWithMediaType> items) throws IOException {
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
sendInternal(items);
}
Expand Down Expand Up @@ -245,9 +247,8 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
* to complete request processing. It should not be used after container
* related events such as an error while {@link #send(Object) sending}.
*/
public synchronized void complete() {
this.complete = true;
if (this.handler != null) {
public void complete() {
if (this.complete.compareAndSet(false, true) && this.handler != null) {
this.handler.complete();
}
}
Expand All @@ -263,11 +264,12 @@ public synchronized void complete() {
* container related events such as an error while
* {@link #send(Object) sending}.
*/
public synchronized void completeWithError(Throwable ex) {
this.complete = true;
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
public void completeWithError(Throwable ex) {
if (this.complete.compareAndSet(false, true)) {
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
}
}
}

Expand All @@ -276,7 +278,7 @@ public synchronized void completeWithError(Throwable ex) {
* called from a container thread when an async request times out.
* <p>As of 6.2, one can register multiple callbacks for this event.
*/
public synchronized void onTimeout(Runnable callback) {
public void onTimeout(Runnable callback) {
this.timeoutCallback.addDelegate(callback);
}

Expand All @@ -287,7 +289,7 @@ public synchronized void onTimeout(Runnable callback) {
* <p>As of 6.2, one can register multiple callbacks for this event.
* @since 5.0
*/
public synchronized void onError(Consumer<Throwable> callback) {
public void onError(Consumer<Throwable> callback) {
this.errorCallback.addDelegate(callback);
}

Expand All @@ -298,7 +300,7 @@ public synchronized void onError(Consumer<Throwable> callback) {
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
* <p>As of 6.2, one can register multiple callbacks for this event.
*/
public synchronized void onCompletion(Runnable callback) {
public void onCompletion(Runnable callback) {
this.completionCallback.addDelegate(callback);
}

Expand Down Expand Up @@ -369,15 +371,15 @@ public MediaType getMediaType() {

private class DefaultCallback implements Runnable {

private List<Runnable> delegates = new ArrayList<>(1);
private final List<Runnable> delegates = new ArrayList<>(1);

public void addDelegate(Runnable delegate) {
public synchronized void addDelegate(Runnable delegate) {
this.delegates.add(delegate);
}

@Override
public void run() {
ResponseBodyEmitter.this.complete = true;
ResponseBodyEmitter.this.complete.compareAndSet(false, true);
for (Runnable delegate : this.delegates) {
delegate.run();
}
Expand All @@ -387,15 +389,15 @@ public void run() {

private class ErrorCallback implements Consumer<Throwable> {

private List<Consumer<Throwable>> delegates = new ArrayList<>(1);
private final List<Consumer<Throwable>> delegates = new ArrayList<>(1);

public void addDelegate(Consumer<Throwable> callback) {
public synchronized void addDelegate(Consumer<Throwable> callback) {
this.delegates.add(callback);
}

@Override
public void accept(Throwable t) {
ResponseBodyEmitter.this.complete = true;
ResponseBodyEmitter.this.complete.compareAndSet(false, true);
for(Consumer<Throwable> delegate : this.delegates) {
delegate.accept(t);
}
Expand Down

0 comments on commit e67f892

Please sign in to comment.