Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and add tests camel flink extension on JVM mode #6422

Merged
merged 6 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions-jvm/flink/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@
</plugins>
</build>

</project>
</project>
12 changes: 11 additions & 1 deletion extensions-jvm/flink/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-maven-plugin</artifactId>
<configuration>
<!--Flink
uses Inverted Class Loading which causing issue for the Quarkus classloading
mechanism -->
<parentFirstArtifacts>
<parentFirstArtifact>org.apache.flink:flink-core</parentFirstArtifact>
<parentFirstArtifact>org.apache.flink:flink-rpc-core</parentFirstArtifact>
<parentFirstArtifact>org.apache.flink:flink-runtime</parentFirstArtifact>
</parentFirstArtifacts>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -66,4 +76,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
10 changes: 9 additions & 1 deletion integration-tests-jvm/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -63,6 +68,9 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<!-- The following dependencies guarantee that this module is built after them. You
can update them by running `mvn process-resources -Pformat -N` from the source tree
root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-flink-deployment</artifactId>
Expand All @@ -80,4 +88,4 @@
</profile>
</profiles>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@
*/
package org.apache.camel.quarkus.component.flink.it;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.flink.DataSetCallback;
import org.apache.camel.component.flink.FlinkConstants;
import org.apache.camel.component.flink.Flinks;
import org.apache.camel.component.flink.VoidDataStreamCallback;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.jboss.logging.Logger;

@Path("/flink")
Expand All @@ -33,18 +48,72 @@ public class FlinkResource {
private static final Logger LOG = Logger.getLogger(FlinkResource.class);

private static final String COMPONENT_FLINK = "flink";

@Inject
CamelContext context;

@Path("/load/component/flink")
@GET
@Inject
ProducerTemplate template;

String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet";
String flinkDataStreamUri = "flink:datastream?datastream=#myDataStream";

@Path("/dataset/{filePath}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response loadComponentFlink() throws Exception {
/* This is an autogenerated test */
if (context.getComponent(COMPONENT_FLINK) != null) {
return Response.ok().build();
public Response dataSetFromTextFile(@PathParam("filePath") String filePath) {

if (Files.exists(Paths.get(filePath))) {
ExecutionEnvironment env = Flinks.createExecutionEnvironment();
DataSet<String> myDataSet = env.readTextFile(filePath);
context.getRegistry().bind("myDataSet", myDataSet);
context.getRegistry().bind("countTotal", addDataSetCallback());
Long totalCount = template.requestBody(
flinkDataSetUri + "&dataSetCallback=#countTotal", null, Long.class);
return Response.ok(totalCount).build();
}
LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_FLINK);
return Response.status(500, COMPONENT_FLINK + " could not be loaded from the Camel context").build();

return Response.status(Response.Status.NOT_FOUND).build();
}

@Path("/datastream/{filePath}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response loadStream(@PathParam("filePath") String filePath, String data) throws IOException {
java.nio.file.Path path = Paths.get(filePath);
if (path != null) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.fromElements(data);
context.getRegistry().bind("myDataStream", datastream);
template.sendBodyAndHeader(flinkDataStreamUri, null,
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
new VoidDataStreamCallback() {
@Override
public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception {
dataStream.writeAsText(filePath,
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
dataStream.getExecutionEnvironment().execute();
}
});
return Response.ok(Files.size(path)).build();
}
return Response.status(Response.Status.NOT_FOUND).build();

}

DataSetCallback addDataSetCallback() {
return new DataSetCallback() {
@Override
public Object onDataSet(DataSet ds, Object... payloads) {
try {
return ds.count();
} catch (Exception e) {
return null;
}
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,87 @@
*/
package org.apache.camel.quarkus.component.flink.it;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.commons.io.FileUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@QuarkusTest
class FlinkTest {

@Test
public void loadComponentFlink() {
/* A simple autogenerated test */
RestAssured.get("/flink/load/component/flink")
.then()
.statusCode(200);
public void dataSetCallback() throws IOException {
Path path = Files.createTempFile("fileDataSet", ".txt");
svkcemk marked this conversation as resolved.
Show resolved Hide resolved
try {
String text = "foo\n"
+ "bar\n"
+ "baz\n"
+ "qux\n"
+ "quux";
Files.writeString(path, text);
RestAssured.given()
.contentType(ContentType.TEXT)
.post("/flink/dataset/{filePath}", path.toAbsolutePath().toString())
.then()
.statusCode(200)
.and()
.body(greaterThanOrEqualTo("5"));

} finally {
try {
Files.deleteIfExists(path);
} catch (Exception e) {
// Do nothing
}
}
}

@Test
public void dataStreamCallback() throws IOException {
Path path = Files.createTempFile("fileDataStream", ".txt");
try {
String text = "Hello!!Camel flink!";
RestAssured.given()
.contentType(ContentType.TEXT)
.body(text)
.post("/flink/datastream/{filePath}", path.toAbsolutePath().toString())
.then()
.statusCode(200);

Awaitility.await()
.pollInterval(Duration.ofMillis(250))
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
if (Files.isDirectory(path)) {
try (Stream<Path> walk = Files.walk(path)) {
return walk.filter(Files::isRegularFile).anyMatch(filePath -> {
try {
if (Files.size(filePath) > 0) {
String content = Files.readString(filePath);
return content.trim().equals(text);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return false;
});
}
}
return false;
});
} finally {
FileUtils.deleteQuietly(path.toFile());
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
<dropwizard-metrics.version>${metrics-version}</dropwizard-metrics.version>
<eddsa.version>${eddsa-version}</eddsa.version>
<eclipse-transformer.version>0.5.0</eclipse-transformer.version>
<flink.version>${flink-version}</flink.version>
<freemarker.version>2.3.33</freemarker.version><!-- @sync io.quarkiverse.freemarker:quarkus-freemarker-parent:${quarkiverse-freemarker.version} prop:freemarker.version -->
<geny.version>0.6.2</geny.version>
<github-api.version>1.313</github-api.version><!-- Used in a Groovy script bellow -->
Expand Down
Loading