Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored and lxbzmy committed Mar 26, 2022
1 parent aa7579f commit b696f82
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,12 +38,12 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
private final AtomicBoolean writeCalled = new AtomicBoolean();


public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
public AbstractListenerServerHttpResponse(DataBufferFactory bufferFactory) {
super(bufferFactory);
}

public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {
super(dataBufferFactory, headers);
public AbstractListenerServerHttpResponse(DataBufferFactory bufferFactory, HttpHeaders headers) {
super(bufferFactory, headers);
}


Expand All @@ -56,15 +56,15 @@ protected final Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> bod
protected final Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {

if (this.writeCalled.compareAndSet(false, true)) {
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
return Mono.from(subscriber -> {
body.subscribe(processor);
processor.subscribe(subscriber);
});
if (!this.writeCalled.compareAndSet(false, true)) {
return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called"));
}
return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called"));
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
return Mono.from(subscriber -> {
body.subscribe(processor);
processor.subscribe(subscriber);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Nullable
private Subscription subscription;

private volatile boolean subscriberCompleted;
private volatile boolean sourceCompleted;

private final WriteResultPublisher resultPublisher;

Expand Down Expand Up @@ -293,7 +293,7 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
return;
}
if (processor.changeState(this, REQUESTED)) {
if (processor.subscriberCompleted) {
if (processor.sourceCompleted) {
handleSubscriberCompleted(processor);
}
else {
Expand All @@ -304,7 +304,7 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
}
@Override
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
processor.subscriberCompleted = true;
processor.sourceCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
handleSubscriberCompleted(processor);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private volatile T currentData;

/* Indicates "onComplete" was received during the (last) write. */
private volatile boolean subscriberCompleted;
private volatile boolean sourceCompleted;

/**
* Indicates we're waiting for one last isReady-onWritePossible cycle
Expand Down Expand Up @@ -374,7 +374,7 @@ else if (processor.changeState(this, WRITING)) {
if (processor.write(data)) {
if (processor.changeState(WRITING, REQUESTED)) {
processor.currentData = null;
if (processor.subscriberCompleted) {
if (processor.sourceCompleted) {
processor.readyToCompleteAfterLastWrite = true;
processor.changeStateToReceived(REQUESTED);
}
Expand All @@ -397,7 +397,7 @@ else if (processor.changeState(this, WRITING)) {

@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
processor.sourceCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
processor.changeStateToComplete(State.REQUESTED);
Expand All @@ -408,7 +408,7 @@ public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
WRITING {
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
processor.sourceCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
processor.changeStateToComplete(State.REQUESTED);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -239,15 +239,24 @@ private static void runIfAsyncNotComplete(AsyncContext asyncContext, AtomicBoole
}


/**
* AsyncListener to complete the {@link AsyncContext} in case of error or
* timeout notifications from the container
* <p>Additional {@link AsyncListener}s are registered in
* {@link ServletServerHttpRequest} to signal onError/onComplete to the
* request body Subscriber, and in {@link ServletServerHttpResponse} to
* cancel the write Publisher and signal onError/onComplete downstream to
* the writing result Subscriber.
*/
private static class HandlerResultAsyncListener implements AsyncListener {

private final AtomicBoolean isCompleted;

private final String logPrefix;

public HandlerResultAsyncListener(AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) {
public HandlerResultAsyncListener(AtomicBoolean isCompleted, ServletServerHttpRequest request) {
this.isCompleted = isCompleted;
this.logPrefix = httpRequest.getLogPrefix();
this.logPrefix = request.getLogPrefix();
}

@Override
Expand Down Expand Up @@ -277,7 +286,7 @@ public void onComplete(AsyncEvent event) {
}


private class HandlerResultSubscriber implements Subscriber<Void> {
private static class HandlerResultSubscriber implements Subscriber<Void> {

private final AsyncContext asyncContext;

Expand Down

0 comments on commit b696f82

Please sign in to comment.