From 51abc78ecf8e67bbb53dff821e95f66d0e981ae6 Mon Sep 17 00:00:00 2001 From: jansupol <15908245+jansupol@users.noreply.github.com> Date: Tue, 17 Dec 2019 13:41:30 +0100 Subject: [PATCH] Fix issues with ChunkedInputStream when using Apache Connector (#4338) * Set Apache Connector behaviour for Apache Http Client prior 4.5.1 to behaviour in Jersey 2.28 Keep behaviour of Jersey 2.29 for Apache HttpClient 4.5.1+ Signed-off-by: Jan Supol --- .../connector/ApacheClientProperties.java | 9 ++ .../ApacheConnectionClosingStrategy.java | 97 +++++++++++++++++ .../apache/connector/ApacheConnector.java | 84 +++++++++++---- .../apache/connector/StreamingTest.java | 59 +++++++--- tests/integration/jersey-4321/pom.xml | 69 ++++++++++++ .../integration/jersey4321/StreamingTest.java | 101 ++++++++++++++++++ tests/integration/pom.xml | 1 + 7 files changed, 385 insertions(+), 35 deletions(-) create mode 100644 connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnectionClosingStrategy.java create mode 100644 tests/integration/jersey-4321/pom.xml create mode 100644 tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheClientProperties.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheClientProperties.java index b77919780e..107e7498fc 100644 --- a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheClientProperties.java +++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheClientProperties.java @@ -156,6 +156,15 @@ public final class ApacheClientProperties { */ public static final String KEEPALIVE_STRATEGY = "jersey.config.apache.client.keepAliveStrategy"; + + /** + * Strategy that closes the Apache Connection. Accepts an instance of {@link ApacheConnectionClosingStrategy}. + * + * @see ApacheConnectionClosingStrategy + * @since 2.30 + */ + public static final String CONNECTION_CLOSING_STRATEGY = "jersey.config.apache.client.connectionClosingStrategy"; + /** * Get the value of the specified property. * diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnectionClosingStrategy.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnectionClosingStrategy.java new file mode 100644 index 0000000000..8c248eae57 --- /dev/null +++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnectionClosingStrategy.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.apache.connector; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.glassfish.jersey.client.ClientRequest; + +import java.io.IOException; +import java.io.InputStream; + +/** + * /** + * Strategy that defines the way the Apache client releases resources. The client enables closing the content stream + * and the response. From the Apache documentation: + *
+ *     The difference between closing the content stream and closing the response is that
+ *     the former will attempt to keep the underlying connection alive by consuming the
+ *     entity content while the latter immediately shuts down and discards the connection.
+ * 
+ * With Apache Client before 4.5.1, it was ok to close the response and the content stream. This is the default for + * Apache Client 4.5 and older. + *

+ * For Apache Client 4.5.1+, first the content stream and the response is should be closed. + *

+ * In the case of Chunk content stream, the stream is not closed on the server side, and the client can hung on reading + * the closing chunk. Using the {@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT} property can prevent + * this hanging forever and the reading of the closing chunk is terminated when the time is out. The other option, when + * the timeout is not set, is to abort the Apache client request. This is the default for Apache Client 4.5.1+ when the + * read timeout is not set. + *

+ * Another option is not to close the content stream, which is possible by the Apache client documentation. In this case, + * however, the server side may not be notified and would not not close its chunk stream. + */ +public interface ApacheConnectionClosingStrategy { + /** + * Method to close the connection. + * @param clientRequest The {@link ClientRequest} to get {@link ClientRequest#getConfiguration() configuration}, + * and {@link ClientRequest#resolveProperty(String, Class) resolve properties}. + * @param request Apache {@code HttpUriRequest} that can be {@code abort}ed. + * @param response Apache {@code CloseableHttpResponse} that can be {@code close}d. + * @param stream The entity stream that can be {@link InputStream#close() closed}. + * @throws IOException In case of some of the closing methods throws {@link IOException} + */ + void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream) + throws IOException; + + /** + * Strategy that aborts Apache HttpRequests for the case of Chunked Stream, closes the stream, and response next. + */ + class GracefulClosingStrategy implements ApacheConnectionClosingStrategy { + static final GracefulClosingStrategy INSTANCE = new GracefulClosingStrategy(); + + @Override + public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream) + throws IOException { + if (response.getEntity() != null && response.getEntity().isChunked()) { + request.abort(); + } + try { + stream.close(); + } catch (IOException ex) { + // Ignore + } finally { + response.close(); + } + } + } + + /** + * Strategy that closes the response and content stream next. This is a behaviour of Jersey 2.28. + */ + class ImmediateClosingStrategy implements ApacheConnectionClosingStrategy { + static final ImmediateClosingStrategy INSTANCE = new ImmediateClosingStrategy(); + + @Override + public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream) + throws IOException { + response.close(); + stream.close(); + } + } +} diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java index 784f69cddc..6fe699fa71 100644 --- a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java +++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java @@ -26,14 +26,15 @@ import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; @@ -173,7 +174,6 @@ class ApacheConnector implements Connector { private static final Logger LOGGER = Logger.getLogger(ApacheConnector.class.getName()); - private static final VersionInfo vi; private static final String release; @@ -325,14 +325,16 @@ class ApacheConnector implements Connector { } clientBuilder.setDefaultRequestConfig(requestConfig); - Optional contract = config.getInstances().stream() - .filter(a -> ApacheHttpClientBuilderConfigurator.class.isInstance(a)).findFirst(); + LinkedList contracts = config.getInstances().stream() + .filter(ApacheHttpClientBuilderConfigurator.class::isInstance) + .collect(Collectors.toCollection(LinkedList::new)); - final HttpClientBuilder configuredBuilder = contract.isPresent() - ? ((ApacheHttpClientBuilderConfigurator) contract.get()).configure(clientBuilder) - : null; + HttpClientBuilder configuredBuilder = clientBuilder; + for (Object configurator : contracts) { + configuredBuilder = ((ApacheHttpClientBuilderConfigurator) configurator).configure(configuredBuilder); + } - this.client = configuredBuilder != null ? configuredBuilder.build() : clientBuilder.build(); + this.client = configuredBuilder.build(); } private HttpClientConnectionManager getConnectionManager(final Client client, @@ -515,7 +517,8 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing } try { - responseContext.setEntityStream(getInputStream(response)); + final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request); + responseContext.setEntityStream(getInputStream(response, closingMechanism)); } catch (final IOException e) { LOGGER.log(Level.SEVERE, null, e); } @@ -655,8 +658,8 @@ private static Map writeOutBoundHeaders(final ClientRequest clie return stringHeaders; } - private static InputStream getInputStream(final CloseableHttpResponse response) throws IOException { - + private static InputStream getInputStream(final CloseableHttpResponse response, + final ConnectionClosingMechanism closingMechanism) throws IOException { final InputStream inputStream; if (response.getEntity() == null) { @@ -670,18 +673,57 @@ private static InputStream getInputStream(final CloseableHttpResponse response) } } - return new FilterInputStream(inputStream) { - @Override - public void close() throws IOException { - try { - super.close(); - } catch (IOException ex) { - // Ignore - } finally { - response.close(); + return closingMechanism.getEntityStream(inputStream, response); + } + + /** + * The way the Apache CloseableHttpResponse is to be closed. + * See https://github.com/eclipse-ee4j/jersey/issues/4321 + * {@link ApacheClientProperties#CONNECTION_CLOSING_STRATEGY} + */ + private final class ConnectionClosingMechanism { + private ApacheConnectionClosingStrategy connectionClosingStrategy = null; + private final ClientRequest clientRequest; + private final HttpUriRequest apacheRequest; + + private ConnectionClosingMechanism(ClientRequest clientRequest, HttpUriRequest apacheRequest) { + this.clientRequest = clientRequest; + this.apacheRequest = apacheRequest; + Object closingStrategyProperty = clientRequest + .resolveProperty(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, Object.class); + if (closingStrategyProperty != null) { + if (ApacheConnectionClosingStrategy.class.isInstance(closingStrategyProperty)) { + connectionClosingStrategy = (ApacheConnectionClosingStrategy) closingStrategyProperty; + } else { + LOGGER.log( + Level.WARNING, + LocalizationMessages.IGNORING_VALUE_OF_PROPERTY( + ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, + closingStrategyProperty, + ApacheConnectionClosingStrategy.class.getName()) + ); } } - }; + + if (connectionClosingStrategy == null) { + if (vi.getRelease().compareTo("4.5") > 0) { + connectionClosingStrategy = ApacheConnectionClosingStrategy.GracefulClosingStrategy.INSTANCE; + } else { + connectionClosingStrategy = ApacheConnectionClosingStrategy.ImmediateClosingStrategy.INSTANCE; + } + } + } + + private InputStream getEntityStream(final InputStream inputStream, + final CloseableHttpResponse response) { + InputStream filterStream = new FilterInputStream(inputStream) { + @Override + public void close() throws IOException { + connectionClosingStrategy.close(clientRequest, apacheRequest, response, in); + } + }; + return filterStream; + } } private static class ConnectionFactory extends ManagedHttpClientConnectionFactory { diff --git a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java index 218bf4b9a5..c5ef31e4b0 100644 --- a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java +++ b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java @@ -18,10 +18,12 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Application; import javax.ws.rs.core.MediaType; @@ -49,21 +51,13 @@ public class StreamingTest extends JerseyTest { * Test that a data stream can be terminated from the client side. */ @Test - public void clientCloseTest() throws IOException { - // start streaming - InputStream inputStream = target().path("/streamingEndpoint").request() - .property(ClientProperties.READ_TIMEOUT, 1_000).get(InputStream.class); + public void clientCloseNoTimeoutTest() throws IOException { + clientCloseTest(-1); + } - WebTarget sendTarget = target().path("/streamingEndpoint/send"); - // trigger sending 'A' to the stream; OK is sent if everything on the server was OK - assertEquals("OK", sendTarget.request().get().readEntity(String.class)); - // check 'A' has been sent - assertEquals('A', inputStream.read()); - // closing the stream should tear down the connection - inputStream.close(); - // trigger sending another 'A' to the stream; it should fail - // (indicating that the streaming has been terminated on the server) - assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); + @Test + public void clientCloseWithTimeOutTest() throws IOException { + clientCloseTest(1_000); } /** @@ -103,6 +97,43 @@ protected Application configure() { return new ResourceConfig(StreamingEndpoint.class); } + /** + * Test that a data stream can be terminated from the client side. + */ + private void clientCloseTest(int readTimeout) throws IOException { + // start streaming + AtomicInteger counter = new AtomicInteger(0); + Invocation.Builder builder = target().path("/streamingEndpoint").request(); + if (readTimeout > -1) { + counter.set(1); + builder.property(ClientProperties.READ_TIMEOUT, readTimeout); + builder.property(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, + (ApacheConnectionClosingStrategy) (config, request, response, stream) -> { + try { + stream.close(); + } catch (Exception e) { + // timeout, no chunk ending + } finally { + counter.set(0); + response.close(); + } + }); + } + InputStream inputStream = builder.get(InputStream.class); + + WebTarget sendTarget = target().path("/streamingEndpoint/send"); + // trigger sending 'A' to the stream; OK is sent if everything on the server was OK + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); + // check 'A' has been sent + assertEquals('A', inputStream.read()); + // closing the stream should tear down the connection + inputStream.close(); + // trigger sending another 'A' to the stream; it should fail + // (indicating that the streaming has been terminated on the server) + assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); + assertEquals(0, counter.get()); + } + @Singleton @Path("streamingEndpoint") public static class StreamingEndpoint { diff --git a/tests/integration/jersey-4321/pom.xml b/tests/integration/jersey-4321/pom.xml new file mode 100644 index 0000000000..ee4f8aa2db --- /dev/null +++ b/tests/integration/jersey-4321/pom.xml @@ -0,0 +1,69 @@ + + + + + project + org.glassfish.jersey.tests.integration + 2.30-SNAPSHOT + + 4.0.0 + + jersey-4321 + + + + org.glassfish.jersey.containers + jersey-container-grizzly2-http + ${project.version} + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-grizzly2 + ${project.version} + test + + + com.google.guava + guava + test + + + org.glassfish.jersey.connectors + jersey-apache-connector + + + org.apache.httpcomponents + httpclient + + + test + + + org.apache.httpcomponents + httpclient + + 4.5 + test + + + + \ No newline at end of file diff --git a/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java b/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java new file mode 100644 index 0000000000..93c441c4fd --- /dev/null +++ b/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.tests.integration.jersey4321; + +import org.glassfish.jersey.apache.connector.ApacheConnectorProvider; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.server.ChunkedOutput; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +import javax.inject.Singleton; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Brought over from Apache connector module, Jersey 2.28 style + * @author Petr Janouch + */ +public class StreamingTest extends JerseyTest { + + /** + * Test that a data stream can be terminated from the client side. + * Using pre Jersey 2.29 strategy + */ + @Test + public void clientCloseCloseResponseFirstTest() throws IOException { + // start streaming + + InputStream inputStream = target().path("/streamingEndpoint").request().get(InputStream.class); + + WebTarget sendTarget = target().path("/streamingEndpoint/send"); + // trigger sending 'A' to the stream; OK is sent if everything on the server was OK + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); + // check 'A' has been sent + assertEquals('A', inputStream.read()); + // closing the stream should tear down the connection + inputStream.close(); + // trigger sending another 'A' to the stream; it should fail + // (indicating that the streaming has been terminated on the server) + assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); + } + + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new ApacheConnectorProvider()); + } + + @Override + protected Application configure() { + return new ResourceConfig(StreamingEndpoint.class); + } + + @Singleton + @Path("streamingEndpoint") + public static class StreamingEndpoint { + + private final ChunkedOutput output = new ChunkedOutput<>(String.class); + + @GET + @Path("send") + public String sendEvent() { + try { + output.write("A"); + } catch (IOException e) { + return "NOK"; + } + + return "OK"; + } + + @GET + @Produces(MediaType.TEXT_PLAIN) + public ChunkedOutput get() { + return output; + } + } +} diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 0b9a841f40..79a1f67635 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -83,6 +83,7 @@ jersey-3670 jersey-3992 jersey-4099 + jersey-4321 portability-jersey-1 portability-jersey-2 property-check