Skip to content

Commit

Permalink
[Improve][Transform] Improve DynamicCompile transform (#7264)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyyyyyssss authored Jul 30, 2024
1 parent b9acb57 commit 9df557c
Show file tree
Hide file tree
Showing 26 changed files with 432 additions and 36 deletions.
30 changes: 27 additions & 3 deletions docs/en/transform-v2/dynamic-compile.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ If the conversion is too complex, it may affect performance

| name | type | required | default value |
|------------------|--------|----------|---------------|
| source_code | string | yes | |
| compile_language | string | yes | |
| source_code | string | no | |
| compile_language | Enum | yes | |
| compile_pattern | Enum | no | SOURCE_CODE |
| absolute_path | string | no | |

### source_code [string]

Expand All @@ -24,11 +26,20 @@ If there are third-party dependency packages, please place them in ${SEATUNNEL_H

Transform plugin common parameters, please refer to [Transform Plugin](common-options.md) for details

### compile_language [string]
### compile_language [Enum]

Some syntax in Java may not be supported, please refer https://github.com/janino-compiler/janino
GROOVY,JAVA

### compile_pattern [Enum]

SOURCE_CODE,ABSOLUTE_PATH
If it is a SOURCE-CODE enumeration; the SOURCE-CODE attribute is required, and the ABSOLUTE_PATH enumeration;ABSOLUTE_PATH attribute is required

### absolute_path [string]

The absolute path of Java or Groovy files on the server

## Example

The data read from source is a table like this:
Expand All @@ -46,6 +57,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="GROOVY"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
Expand Down Expand Up @@ -82,6 +94,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="JAVA"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
Expand Down Expand Up @@ -113,6 +126,17 @@ transform {
}
}
transform {
DynamicCompile {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="GROOVY"
compile_pattern="ABSOLUTE_PATH"
absolute_path="""/tmp/GroovyFile"""
}
}
```

Then the data in result table `fake1` will like this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ default Container.ExecResult restoreJob(String confFile, String jobId)
}

String getServerLogs();

void copyFileToContainer(String path, String targetPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -168,4 +169,10 @@ public String executeJobManagerInnerCommand(String command)
throws IOException, InterruptedException {
return jobManager.execInContainer("bash", "-c", command).getStdout();
}

@Override
public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, jobManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,10 @@ public Container.ExecResult executeJob(String confFile, List<String> variables)
public String getServerLogs() {
return server1.getLogs();
}

@Override
public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, server1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,10 @@ public Container.ExecResult restoreJob(String confFile, String jobId)
public String getServerLogs() {
return server.getLogs();
}

@Override
public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, server);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -118,4 +119,10 @@ public Container.ExecResult executeJob(String confFile, List<String> variables)
public String getServerLogs() {
return master.getLogs();
}

@Override
public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, master);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,63 @@

public class TestDynamicCompileIT extends TestSuiteBase {

private final String basePath = "/dynamic_compile/conf/";

@TestTemplate
public void testDynamicSingleCompileGroovy(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob(
"/dynamic_compile/single_dynamic_groovy_compile_transform.conf");
container.executeJob(basePath + "single_dynamic_groovy_compile_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicSingleCompileJava(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/dynamic_compile/single_dynamic_java_compile_transform.conf");
container.executeJob(basePath + "single_dynamic_java_compile_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicMultipleCompileGroovy(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob(
"/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf");
container.executeJob(basePath + "multiple_dynamic_groovy_compile_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicMultipleCompileJava(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob(
"/dynamic_compile/multiple_dynamic_java_compile_transform.conf");
container.executeJob(basePath + "multiple_dynamic_java_compile_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicMixedCompileJavaAndGroovy(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob(
"/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf");
container.executeJob(basePath + "mixed_dynamic_groovy_java_compile_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicSinglePathGroovy(TestContainer container)
throws IOException, InterruptedException {
container.copyFileToContainer("/dynamic_compile/source_file/GroovyFile", "/tmp/GroovyFile");
Container.ExecResult execResult =
container.executeJob(basePath + "single_groovy_path_compile.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicSinglePathJava(TestContainer container)
throws IOException, InterruptedException {
container.copyFileToContainer("/dynamic_compile/source_file/JavaFile", "/tmp/JavaFile");
Container.ExecResult execResult =
container.executeJob(basePath + "single_java_path_compile.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="JAVA"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
Expand Down Expand Up @@ -80,6 +81,7 @@ transform {
source_table_name = "fake1"
result_table_name = "fake2"
compile_language="GROOVY"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="GROOVY"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
Expand Down Expand Up @@ -73,6 +74,7 @@ transform {
source_table_name = "fake1"
result_table_name = "fake2"
compile_language="GROOVY"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="JAVA"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
Expand Down Expand Up @@ -80,6 +81,7 @@ transform {
source_table_name = "fake1"
result_table_name = "fake2"
compile_language="JAVA"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="GROOVY"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DynamicCompile {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="JAVA"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
}
}
}
}

transform {
DynamicCompile {
source_table_name = "fake"
result_table_name = "fake1"
compile_language="GROOVY"
compile_pattern="ABSOLUTE_PATH"
absolute_path="""/tmp/GroovyFile"""

}
}

sink {
Assert {
source_table_name = "fake1"
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
],
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = aa
field_type = string
field_value = [
{
rule_type = NOT_NULL
equals_to = "AA"

}

]
}
]
}
}
}
Loading

0 comments on commit 9df557c

Please sign in to comment.