Skip to content

Commit

Permalink
Merge pull request quarkusio#18222 from geoand/quarkusio#18201
Browse files Browse the repository at this point in the history
Introduce support for Kotlin Flow as return type in RESTEasy Reactive
  • Loading branch information
geoand authored Jul 5, 2021
2 parents 4c1c4f5 + 480d9d5 commit 987c9b4
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 0 deletions.
5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4810,6 +4810,11 @@
<artifactId>kotlinx-coroutines-jdk8</artifactId>
<version>${kotlin.coroutine.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactive</artifactId>
<version>${kotlin.coroutine.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import org.jboss.resteasy.reactive.common.processor.HashUtil;
import org.jboss.resteasy.reactive.server.core.parameters.NullParamExtractor;
import org.jboss.resteasy.reactive.server.core.parameters.ParameterExtractor;
import org.jboss.resteasy.reactive.server.handlers.PublisherResponseHandler;
import org.jboss.resteasy.reactive.server.model.FixedHandlersChainCustomizer;
import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer;
import org.jboss.resteasy.reactive.server.processor.scanning.MethodScanner;
import org.jboss.resteasy.reactive.server.runtime.kotlin.CoroutineEndpointInvoker;
import org.jboss.resteasy.reactive.server.runtime.kotlin.CoroutineMethodProcessor;
import org.jboss.resteasy.reactive.server.runtime.kotlin.FlowToPublisherHandler;
import org.jboss.resteasy.reactive.server.spi.EndpointInvoker;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
Expand All @@ -36,6 +39,7 @@
public class KotlinCoroutineIntegrationProcessor {

static final DotName CONTINUATION = DotName.createSimple("kotlin.coroutines.Continuation");
static final DotName FLOW = DotName.createSimple("kotlinx.coroutines.flow.Flow");
public static final String NAME = KotlinCoroutineIntegrationProcessor.class.getName();
private static final DotName BLOCKING_ANNOTATION = DotName.createSimple("io.smallrye.common.annotation.Blocking");

Expand Down Expand Up @@ -148,4 +152,21 @@ private Supplier<EndpointInvoker> createCoroutineInvoker(ClassInfo currentClassI
}
return factory.invoker(baseName);
}

@BuildStep
public MethodScannerBuildItem flowSupport() {
return new MethodScannerBuildItem(new MethodScanner() {
@Override
public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndpointClass,
Map<String, Object> methodContext) {
DotName returnTypeName = method.returnType().name();
if (returnTypeName.equals(FLOW)) {
return Collections.singletonList(new FixedHandlersChainCustomizer(
List.of(new FlowToPublisherHandler(), new PublisherResponseHandler()),
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE));
}
return Collections.emptyList();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactive</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.jboss.resteasy.reactive.server.runtime.kotlin

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asPublisher
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler

class FlowToPublisherHandler : ServerRestHandler {

override fun handle(requestContext: ResteasyReactiveRequestContext?) {
val result = requestContext!!.result
if (result is Flow<*>) {
requestContext.result = (result as Flow<Any>) // cast needed for extension function
.asPublisher()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.jboss.resteasy.reactive.server.model;

import java.util.Collections;
import java.util.List;
import org.jboss.resteasy.reactive.common.model.ResourceClass;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;

public class FixedHandlersChainCustomizer implements HandlerChainCustomizer {

private List<ServerRestHandler> handlers;
private Phase phase;

public FixedHandlersChainCustomizer(List<ServerRestHandler> handlers, Phase phase) {
this.handlers = handlers;
this.phase = phase;
}

public FixedHandlersChainCustomizer() {
}

@Override
public List<ServerRestHandler> handlers(Phase phase, ResourceClass resourceClass,
ServerResourceMethod serverResourceMethod) {
if (this.phase == phase) {
return handlers;
}
return Collections.emptyList();
}

public List<ServerRestHandler> getHandlers() {
return handlers;
}

public void setHandlers(List<ServerRestHandler> handlers) {
this.handlers = handlers;
}

public Phase getPhase() {
return phase;
}

public void setPhase(Phase phase) {
this.phase = phase;
}
}
5 changes: 5 additions & 0 deletions integration-tests/resteasy-reactive-kotlin/standard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.it.resteasy.reactive.kotlin

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import org.jboss.resteasy.reactive.RestSseElementType
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.MediaType

@Path("flow")
class FlowResource {

@GET
@Path("str")
@Produces(MediaType.SERVER_SENT_EVENTS)
fun sseStrings() = flow {
emit("Hello")
emit("From")
emit("Kotlin")
emit("Flow")
}

@GET
@Path("json")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.APPLICATION_JSON)
fun sseJson() = flow {
emit(Country("Barbados", "Bridgetown"))
delay(1000)
emit(Country("Mauritius", "Port Louis"))
delay(1000)
emit(Country("Fiji", "Suva"))
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
quarkus.native.enable-https-url-handler=true
quarkus.kafka.devservices.enabled=false
countries/mp-rest/url=${test.url}

mp.messaging.outgoing.countries-emitter.connector=smallrye-kafka
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.it.resteasy.reactive.kotlin

import io.quarkus.test.common.http.TestHTTPResource
import io.quarkus.test.junit.QuarkusTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import javax.ws.rs.client.ClientBuilder
import javax.ws.rs.client.WebTarget
import javax.ws.rs.sse.SseEventSource

@QuarkusTest
class FlowResourceTest {

@TestHTTPResource("/flow")
var flowPath: String? = null

@Test
fun testSeeStrings() {
testSse("str", 5) {
assertThat(it).containsExactly("Hello", "From", "Kotlin", "Flow")
}
}

@Test
fun testSeeJson() {
testSse("json", 10) {
assertThat(it).containsExactly(
"{\"name\":\"Barbados\",\"capital\":\"Bridgetown\"}",
"{\"name\":\"Mauritius\",\"capital\":\"Port Louis\"}",
"{\"name\":\"Fiji\",\"capital\":\"Suva\"}")
}
}

private fun testSse(path: String, timeout: Long, assertion: (List<String>) -> Unit) {
val client = ClientBuilder.newBuilder().build()
val target: WebTarget = client.target("$flowPath/$path")
SseEventSource.target(target).reconnectingEvery(Int.MAX_VALUE.toLong(), TimeUnit.SECONDS)
.build().use { eventSource ->
val res = CompletableFuture<List<String>>()
val collect = Collections.synchronizedList(ArrayList<String>())
eventSource.register({ inboundSseEvent -> collect.add(inboundSseEvent.readData()) }, { throwable -> res.completeExceptionally(throwable) }) { res.complete(collect) }
eventSource.open()
val list = res.get(timeout, TimeUnit.SECONDS)
assertion(list)
}
}
}

0 comments on commit 987c9b4

Please sign in to comment.