Skip to content

Commit

Permalink
Undisable ee9 BlockingTest and fix HttpChannel.produceContent (#12529)
Browse files Browse the repository at this point in the history
Undisable ee9 BlockingTest and fix HttpChannel.produceContent

Signed-off-by: Ludovic Orban <[email protected]>
Co-authored-by: Ludovic Orban <[email protected]>
  • Loading branch information
janbartel and lorban authored Dec 18, 2024
1 parent 37d0b7c commit 6b632f6
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,7 @@ public void succeeded()
httpChannel = _request.lockedGetHttpChannelState();
httpChannel.lockedStreamSendCompleted(true);
}

if (callback != null)
httpChannel._writeInvoker.run(callback::succeeded);
}
Expand Down Expand Up @@ -1361,6 +1362,7 @@ public void failed(Throwable x)
httpChannel = _request.lockedGetHttpChannelState();
httpChannel.lockedStreamSendCompleted(false);
}

if (callback != null)
httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,9 +1609,7 @@ public void failed(Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("aborting", x);
abort(x);
_httpChannel.recycle();
_parser.reset();
_generator.reset();
_httpChannel.setHttpStream(null);
if (!_handling.compareAndSet(true, false))
resume();
}
Expand Down
5 changes: 5 additions & 0 deletions jetty-ee8/jetty-ee8-nested/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http-tools</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions jetty-ee9/jetty-ee9-nested/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http-tools</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ public ConnectionMetaData getConnectionMetaData()
*/
public boolean needContent()
{
ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest();
// When coreContextRequest is null, produceContent() always immediately returns an error content.
if (coreContextRequest == null)
return true;
// TODO: optimize by attempting a read?
getCoreRequest().demand(_needContentTask);
coreContextRequest.demand(_needContentTask);
return false;
}

Expand All @@ -171,7 +175,10 @@ public boolean needContent()
*/
public HttpInput.Content produceContent()
{
Content.Chunk chunk = getCoreRequest().read();
ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest();
if (coreContextRequest == null)
return new HttpInput.ErrorContent(new IOException("Channel has been recycled"));
Content.Chunk chunk = coreContextRequest.read();
if (chunk == null)
return null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -51,7 +51,6 @@
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Disabled // TODO
public class BlockingTest
{
private Server server;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void testBlockingReadThenNormalComplete() throws Exception
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
Expand All @@ -103,14 +102,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -253,22 +257,22 @@ public void testNormalCompleteThenBlockingRead() throws Exception
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AtomicReference<Thread> threadRef = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
assertTrue(completed.await(10, TimeUnit.SECONDS));
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
Expand All @@ -278,14 +282,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
threadRef.set(thread);
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -321,7 +331,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
assertThat(response.getContent(), containsString("OK"));

completed.countDown();
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED);

// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
Expand All @@ -335,6 +345,7 @@ public void testStartAsyncThenBlockingReadThenTimeout() throws Exception
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Thread> threadRef = new AtomicReference<>();
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
Expand All @@ -347,16 +358,15 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
AsyncContext async = request.startAsync();
async.setTimeout(100);

new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
assertTrue(completed.await(10, TimeUnit.SECONDS));
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
Expand All @@ -366,14 +376,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
threadRef.set(thread);
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -406,7 +422,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
assertThat(response.getContent(), containsString("AsyncContext timeout"));

completed.countDown();
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED);

// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
Expand All @@ -428,7 +444,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
Expand All @@ -445,14 +461,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -505,7 +526,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentType("text/plain");
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
Expand All @@ -523,14 +544,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on write
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down

0 comments on commit 6b632f6

Please sign in to comment.