Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][transform-v2] Support transform metrics #8173

Open
wants to merge 71 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
d234c9c
[Feature][transform-v2] Support transform metrics
CosmosNi Nov 29, 2024
eac26d1
[Feature][transform-v2] Support transform metrics
CosmosNi Nov 29, 2024
d17353b
[Feature][transform-v2] Support transform metrics
CosmosNi Nov 29, 2024
7782507
[Feature][transform-v2] Support transform metrics
CosmosNi Nov 29, 2024
5a68e50
[Feature][transform-v2] Support transform metrics
CosmosNi Nov 29, 2024
11f4e3a
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 2, 2024
6f69124
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 2, 2024
d2b2ec7
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 3, 2024
4f4de40
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 3, 2024
ded8c18
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 3, 2024
048e522
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 3, 2024
fc0a429
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 3, 2024
9bdbc3a
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 3, 2024
4c60cf3
Update Hive.md
CosmosNi Dec 3, 2024
ad5a696
[Feature][transform-v2] update `plugin_input` and `plugin_output`
CosmosNi Dec 4, 2024
05c8e16
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 5, 2024
0fe280d
[Feature][transform-v2] resolve kafka it execute timeout
CosmosNi Dec 5, 2024
f5d6387
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 6, 2024
2fd9a0f
[Feature][transform-v2] resolve kafka it execute timeout
CosmosNi Dec 6, 2024
8ce0afa
[Feature][transform-v2] TableRenameTransform support
CosmosNi Dec 6, 2024
23a9027
[Feature][transform-v2] TableRenameTransform support
CosmosNi Dec 6, 2024
c7654bf
[Feature][transform-v2] TableRenameTransform support
CosmosNi Dec 6, 2024
c2a0794
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 6, 2024
e0e6a03
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 9, 2024
c687fce
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 9, 2024
2c191f6
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 10, 2024
8f32bfc
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 10, 2024
86fbe80
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 10, 2024
d57e538
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 11, 2024
64a1914
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 11, 2024
2c1d0b9
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 11, 2024
52219ab
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 11, 2024
cd58795
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 11, 2024
38f61be
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 11, 2024
1f463df
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 12, 2024
31b09e4
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Dec 13, 2024
6bad18e
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 17, 2024
fcb4474
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 18, 2024
3be6f7e
Merge branch 'dev' of github.com:apache/seatunnel into feature_transf…
CosmosNi Dec 23, 2024
bcb2527
Merge branch 'feature_transform_metrics' of github.com:CosmosNi/seatu…
CosmosNi Dec 23, 2024
2f9d60b
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 24, 2024
ca865d2
Merge remote-tracking branch 'origin/feature_transform_metrics' into …
CosmosNi Dec 24, 2024
2982821
[Feature][transform-v2] Support transform metrics
CosmosNi Dec 27, 2024
6c1f354
[Feature][transform-v2] Support transform metrics
CosmosNi Jan 2, 2025
5c211fc
Merge remote-tracking branch 'upstream/dev' into feature_transform_me…
CosmosNi Jan 9, 2025
ed2ed5c
[Feature][transform-v2] Support transform metrics
CosmosNi Jan 10, 2025
cdb841c
Merge remote-tracking branch 'upstream/dev' into feature_transform_me…
CosmosNi Jan 11, 2025
1cf7286
[Feature][transform-v2] support transform metrics
CosmosNi Jan 11, 2025
3b064bb
[Feature][transform-v2] Support transform metrics
Jan 16, 2025
71a9828
[Feature][transform-v2] Support transform metrics
Jan 16, 2025
c80f847
[Feature][transform-v2] Support flink transform metrics
Jan 16, 2025
9e1be27
[Feature][transform-v2] Support flink transform metrics
Jan 16, 2025
f4edf09
[Feature][transform-v2] Support flink transform metrics
Jan 16, 2025
ce8df7b
[Feature][transform-v2] Support flink transform metrics
Jan 16, 2025
2c7c683
[Feature][transform-v2] Support flink transform metrics
Jan 16, 2025
6fd3d9d
[Feature][transform-v2] Support flink transform metrics
Jan 17, 2025
3ee00f4
Merge remote-tracking branch 'upstream/dev' into feature_transform_me…
Jan 20, 2025
807c44e
[Fix][doc] Repair doris dead link
Jan 20, 2025
30de2d4
[Feature][transform-v2] Support flink transform metrics
Jan 20, 2025
12af93e
[Feature][transform-v2] Support flink transform metrics
Jan 20, 2025
08b5370
Merge remote-tracking branch 'upstream/dev' into feature_transform_me…
Jan 22, 2025
76349c7
[Feature][transform-v2] Support flink transform metrics
Jan 22, 2025
60cd7fe
[Feature][transform-v2] Support flink transform metrics
Jan 22, 2025
633801e
[Feature][transform-v2] Support transform metrics
Jan 23, 2025
3a37d6e
[Feature][transform-v2] Support transform metrics
Jan 23, 2025
af224b1
[Feature][transform-v2] add metric doc
CosmosNi Jan 23, 2025
55fd5db
Merge remote-tracking branch 'upstream/dev' into feature_transform_me…
CosmosNi Jan 25, 2025
b23bc44
[Fix][doris-e2e] fix init be 127.0.0.1 connect refuse
CosmosNi Jan 26, 2025
094bba6
[Fix][doris-e2e] fix init be 127.0.0.1 connect refuse
CosmosNi Jan 26, 2025
c1c0feb
[Fix][doris-e2e] fix init be 127.0.0.1 connect refuse
CosmosNi Jan 26, 2025
e703f28
[Feature][transform-v2] Support transform metrics
CosmosNi Jan 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/seatunnel-engine/rest-api-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId in
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TransformOutputCount": {},
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
Expand Down
2 changes: 2 additions & 0 deletions docs/en/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ seatunnel:
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TransformOutputCount": {},
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
Expand Down Expand Up @@ -245,6 +246,7 @@ This API has been deprecated, please use /job-info/:jobId instead
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TransformOutputCount": {},
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
Expand Down
1 change: 1 addition & 0 deletions docs/zh/seatunnel-engine/rest-api-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ network:
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TransformOutputCount": {},
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
Expand Down
1 change: 1 addition & 0 deletions docs/zh/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ seatunnel:
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TransformOutputCount": {},
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ private MetricNames() {}
public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds";
public static final String TRANSFORM_OUTPUT_COUNT = "TransformOutputCount";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.metrics.MetricNames;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
Expand All @@ -32,12 +34,16 @@
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;

import java.net.URL;
Expand Down Expand Up @@ -103,24 +109,27 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
LinkedHashMap::new));

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
int index = 0;
for (int i = 0; i < plugins.size(); i++) {
try {
Config pluginConfig = pluginConfigs.get(i);
ReadonlyConfig options = ReadonlyConfig.fromConfig(pluginConfig);
DataStreamTableInfo stream =
fromSourceTable(pluginConfig, new ArrayList<>(outputTables.values()))
.orElse(input);
TableTransformFactory factory = plugins.get(i);
TableTransformFactoryContext context =
new TableTransformFactoryContext(
stream.getCatalogTables(),
ReadonlyConfig.fromConfig(pluginConfig),
classLoader);
stream.getCatalogTables(), options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
SeaTunnelTransform transform = factory.createTransform(context).createTransform();

transform.setJobContext(jobContext);
String metricName =
String.format("Transform[%s]-%s", transform.getPluginName(), index++);
String pluginOutput = options.get(PLUGIN_OUTPUT);
DataStream<SeaTunnelRow> inputStream =
flinkTransform(transform, stream.getDataStream());
flinkTransform(transform, stream.getDataStream(), metricName, pluginOutput);
String pluginOutputIdentifier =
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT);
// TODO transform support multi tables
Expand All @@ -142,10 +151,14 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
}

protected DataStream<SeaTunnelRow> flinkTransform(
SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
SeaTunnelTransform transform,
DataStream<SeaTunnelRow> stream,
String metricName,
String pluginOutput) {
if (transform instanceof SeaTunnelFlatMapTransform) {
return stream.flatMap(
new ArrayFlatMap(transform), TypeInformation.of(SeaTunnelRow.class));
new ArrayFlatMap(transform, metricName, pluginOutput),
TypeInformation.of(SeaTunnelRow.class));
}

return stream.transform(
Expand All @@ -155,20 +168,59 @@ protected DataStream<SeaTunnelRow> flinkTransform(
flinkRuntimeEnvironment
.getStreamExecutionEnvironment()
.clean(
row ->
((SeaTunnelMapTransform<SeaTunnelRow>)
transform)
.map(row))))
new FlinkRichMapFunction(
transform, metricName, pluginOutput))))
// null value shouldn't be passed to downstream
.filter(Objects::nonNull);
}

public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, SeaTunnelRow> {
public static class FlinkRichMapFunction extends RichMapFunction<SeaTunnelRow, SeaTunnelRow> {
private MetricsContext metricsContext;
private SeaTunnelTransform transform;
private final String metricName;
private final String pluginOutput;

public FlinkRichMapFunction(
SeaTunnelTransform transform, String metricName, String pluginOutput) {
this.transform = transform;
this.metricName = metricName;
this.pluginOutput = pluginOutput;
}

@Override
public void open(Configuration parameters) throws Exception {
metricsContext = new FlinkMetricContext((StreamingRuntimeContext) getRuntimeContext());
}

@Override
public SeaTunnelRow map(SeaTunnelRow row) throws Exception {
if (Objects.isNull(row)) {
return null;
}
SeaTunnelRow rowResult = ((SeaTunnelMapTransform<SeaTunnelRow>) transform).map(row);
if (rowResult != null) {
String tableId = pluginOutput == null ? rowResult.getTableId() : pluginOutput;
updateMetric(metricName, tableId, metricsContext);
}
return rowResult;
}
}

public static class ArrayFlatMap extends RichFlatMapFunction<SeaTunnelRow, SeaTunnelRow> {
private MetricsContext metricsContext;
private SeaTunnelTransform transform;
private final String metricName;
private final String pluginOutput;

public ArrayFlatMap(SeaTunnelTransform transform) {
public ArrayFlatMap(SeaTunnelTransform transform, String metricName, String pluginOutput) {
this.transform = transform;
this.metricName = metricName;
this.pluginOutput = pluginOutput;
}

@Override
public void open(Configuration parameters) throws Exception {
metricsContext = new FlinkMetricContext((StreamingRuntimeContext) getRuntimeContext());
}

@Override
Expand All @@ -177,9 +229,25 @@ public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow> collector) {
((SeaTunnelFlatMapTransform<SeaTunnelRow>) transform).flatMap(row);
if (CollectionUtils.isNotEmpty(rows)) {
for (SeaTunnelRow rowResult : rows) {
String tableId = pluginOutput == null ? rowResult.getTableId() : pluginOutput;
updateMetric(metricName, tableId, metricsContext);
collector.collect(rowResult);
}
}
}
}

private static void updateMetric(
String metricName, String tableId, MetricsContext metricsContext) {
StringBuilder metricNameBuilder = new StringBuilder();
metricNameBuilder
.append(MetricNames.TRANSFORM_OUTPUT_COUNT)
.append("#")
.append(metricName)
.append("#")
.append(tableId);
if (metricsContext != null) {
metricsContext.counter(metricNameBuilder.toString()).inc();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;

import com.hazelcast.client.config.ClientConfig;
Expand All @@ -56,6 +57,8 @@
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -229,7 +232,8 @@ public void execute() throws CommandExecuteException {
} finally {
if (jobMetricsSummary != null) {
// print job statistics information when job finished
log.info(
StringBuilder logMessage = new StringBuilder();
logMessage.append(
StringFormatUtils.formatTable(
"Job Statistic Information",
"Start Time",
Expand All @@ -247,6 +251,28 @@ public void execute() throws CommandExecuteException {
"Total Failed Count",
jobMetricsSummary.getSourceReadCount()
- jobMetricsSummary.getSinkWriteCount()));

if (MapUtils.isNotEmpty(jobMetricsSummary.getTransformMetricsMaps())) {
jobMetricsSummary
.getTransformMetricsMaps()
.forEach(
(tableName, metrics) -> {
String[] transformInfos =
new String[metrics.entrySet().size() * 2 + 2];
transformInfos[0] = tableName + " Information";
int index = 0;
for (Map.Entry<String, Object> entry : metrics.entrySet()) {
transformInfos[++index] = entry.getKey();
transformInfos[++index] =
String.valueOf(entry.getValue());
}
if (Objects.nonNull(transformInfos)) {
logMessage.append(
StringFormatUtils.formatTable(transformInfos));
}
});
}
log.info("{}", logMessage);
}
closeClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,25 @@ public void multiTableMetrics() {
equalTo(String.valueOf(dataSize * 10)))
.body(
"metrics.TableSinkWriteBytes.'fake.public.table2'",
equalTo(String.valueOf(dataSize * 5)));
equalTo(String.valueOf(dataSize * 5)))
.body(
"metrics.TransformOutputCount.'Transform[0]-Sql'.'fake3'",
equalTo("10"))
.body(
"metrics.TransformOutputCount.'Transform[1]-Sql'.'fake4'",
equalTo("10"))
.body(
"metrics.TransformOutputCount.'Transform[2]-Sql'.'fake5'",
equalTo("10"))
.body(
"metrics.TransformOutputCount.'Transform[3]-Sql'.'fake6'",
equalTo("5"))
.body(
"metrics.TransformOutputCount.'Transform[4]-Sql'.'fake7'",
equalTo("5"))
.body(
"metrics.TransformOutputCount.'Transform[5]-Sql'.'fake8'",
equalTo("5"));
Assertions.assertTrue(
Double.parseDouble(response.path("metrics.SourceReceivedQPS"))
> 0
Expand Down Expand Up @@ -178,6 +196,35 @@ public void multiTableMetrics() {
&& Double.parseDouble(
response.path(
"metrics.TableSinkWriteBytesPerSeconds.'fake.public.table2'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TransformOutputCount.'Transform[0]-Sql'.'fake3'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TransformOutputCount.'Transform[1]-Sql'"
+ ".'fake4'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TransformOutputCount.'Transform[2]-Sql'"
+ ".'fake5'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TransformOutputCount.'Transform[3]-Sql'"
+ ".'fake6'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TransformOutputCount.'Transform[4]-Sql'"
+ ".'fake7'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TransformOutputCount.'Transform[5]-Sql'"
+ ".'fake8'"))
> 0);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,44 @@ source {
}

transform {
Sql {
plugin_input = "fake1"
plugin_output = "fake3"
query = "select * from fake1"
}
Sql {
plugin_input = "fake3"
plugin_output = "fake4"
query = "select * from fake3"
}
Sql {
plugin_input = "fake4"
plugin_output = "fake5"
query = "select * from fake4"
}
Sql {
plugin_input = "fake2"
plugin_output = "fake6"
query = "select * from fake2"
}
Sql {
plugin_input = "fake6"
plugin_output = "fake7"
query = "select * from fake6"
}
Sql {
plugin_input = "fake7"
plugin_output = "fake8"
query = "select * from fake7"
}

}

sink {
console {
plugin_input = "fake1"
plugin_input = "fake5"
}
console {
plugin_input = "fake2"
plugin_input = "fake8"
}
}
}
Loading
Loading