diff --git a/docs/en/seatunnel-engine/rest-api-v1.md b/docs/en/seatunnel-engine/rest-api-v1.md index f9b5f69aa7b..c99ce868781 100644 --- a/docs/en/seatunnel-engine/rest-api-v1.md +++ b/docs/en/seatunnel-engine/rest-api-v1.md @@ -259,6 +259,7 @@ This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId in "SinkWriteQPS": "", "SinkWriteBytes": "", "SinkWriteBytesPerSeconds": "", + "TransformOutputCount": {}, "TableSourceReceivedCount": {}, "TableSourceReceivedBytes": {}, "TableSourceReceivedBytesPerSeconds": {}, diff --git a/docs/en/seatunnel-engine/rest-api-v2.md b/docs/en/seatunnel-engine/rest-api-v2.md index 72eb9044431..4939371ceda 100644 --- a/docs/en/seatunnel-engine/rest-api-v2.md +++ b/docs/en/seatunnel-engine/rest-api-v2.md @@ -165,6 +165,7 @@ seatunnel: "SinkWriteQPS": "", "SinkWriteBytes": "", "SinkWriteBytesPerSeconds": "", + "TransformOutputCount": {}, "TableSourceReceivedCount": {}, "TableSourceReceivedBytes": {}, "TableSourceReceivedBytesPerSeconds": {}, @@ -245,6 +246,7 @@ This API has been deprecated, please use /job-info/:jobId instead "SinkWriteQPS": "", "SinkWriteBytes": "", "SinkWriteBytesPerSeconds": "", + "TransformOutputCount": {}, "TableSourceReceivedCount": {}, "TableSourceReceivedBytes": {}, "TableSourceReceivedBytesPerSeconds": {}, diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md b/docs/zh/seatunnel-engine/rest-api-v1.md index f23021cdb94..919e9ad0848 100644 --- a/docs/zh/seatunnel-engine/rest-api-v1.md +++ b/docs/zh/seatunnel-engine/rest-api-v1.md @@ -191,6 +191,7 @@ network: "SinkWriteQPS": "", "SinkWriteBytes": "", "SinkWriteBytesPerSeconds": "", + "TransformOutputCount": {}, "TableSourceReceivedCount": {}, "TableSourceReceivedBytes": {}, "TableSourceReceivedBytesPerSeconds": {}, diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md b/docs/zh/seatunnel-engine/rest-api-v2.md index ed3c6803c7a..59f7f31618b 100644 --- a/docs/zh/seatunnel-engine/rest-api-v2.md +++ b/docs/zh/seatunnel-engine/rest-api-v2.md @@ -161,6 +161,7 @@ seatunnel: "SinkWriteQPS": "", "SinkWriteBytes": "", "SinkWriteBytesPerSeconds": "", + "TransformOutputCount": {}, "TableSourceReceivedCount": {}, "TableSourceReceivedBytes": {}, "TableSourceReceivedBytesPerSeconds": {}, diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java index b1fc60e0f16..0d1c6116cb3 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java @@ -33,4 +33,5 @@ private MetricNames() {} public static final String SINK_WRITE_BYTES = "SinkWriteBytes"; public static final String SINK_WRITE_QPS = "SinkWriteQPS"; public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds"; + public static final String TRANSFORM_OUTPUT_COUNT = "TransformOutputCount"; } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 615876c4173..1808ffeff84 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.metrics.MetricNames; +import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.table.factory.TableTransformFactory; @@ -32,12 +34,16 @@ import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; +import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext; import org.apache.commons.collections.CollectionUtils; -import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.Collector; import java.net.URL; @@ -103,24 +109,27 @@ public List execute(List upstreamDataS LinkedHashMap::new)); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + int index = 0; for (int i = 0; i < plugins.size(); i++) { try { Config pluginConfig = pluginConfigs.get(i); + ReadonlyConfig options = ReadonlyConfig.fromConfig(pluginConfig); DataStreamTableInfo stream = fromSourceTable(pluginConfig, new ArrayList<>(outputTables.values())) .orElse(input); TableTransformFactory factory = plugins.get(i); TableTransformFactoryContext context = new TableTransformFactoryContext( - stream.getCatalogTables(), - ReadonlyConfig.fromConfig(pluginConfig), - classLoader); + stream.getCatalogTables(), options, classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); SeaTunnelTransform transform = factory.createTransform(context).createTransform(); transform.setJobContext(jobContext); + String metricName = + String.format("Transform[%s]-%s", transform.getPluginName(), index++); + String pluginOutput = options.get(PLUGIN_OUTPUT); DataStream inputStream = - flinkTransform(transform, stream.getDataStream()); + flinkTransform(transform, stream.getDataStream(), metricName, pluginOutput); String pluginOutputIdentifier = ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT); // TODO transform support multi tables @@ -142,10 +151,14 @@ public List execute(List upstreamDataS } protected DataStream flinkTransform( - SeaTunnelTransform transform, DataStream stream) { + SeaTunnelTransform transform, + DataStream stream, + String metricName, + String pluginOutput) { if (transform instanceof SeaTunnelFlatMapTransform) { return stream.flatMap( - new ArrayFlatMap(transform), TypeInformation.of(SeaTunnelRow.class)); + new ArrayFlatMap(transform, metricName, pluginOutput), + TypeInformation.of(SeaTunnelRow.class)); } return stream.transform( @@ -155,20 +168,59 @@ protected DataStream flinkTransform( flinkRuntimeEnvironment .getStreamExecutionEnvironment() .clean( - row -> - ((SeaTunnelMapTransform) - transform) - .map(row)))) + new FlinkRichMapFunction( + transform, metricName, pluginOutput)))) // null value shouldn't be passed to downstream .filter(Objects::nonNull); } - public static class ArrayFlatMap implements FlatMapFunction { + public static class FlinkRichMapFunction extends RichMapFunction { + private MetricsContext metricsContext; + private SeaTunnelTransform transform; + private final String metricName; + private final String pluginOutput; + + public FlinkRichMapFunction( + SeaTunnelTransform transform, String metricName, String pluginOutput) { + this.transform = transform; + this.metricName = metricName; + this.pluginOutput = pluginOutput; + } + + @Override + public void open(Configuration parameters) throws Exception { + metricsContext = new FlinkMetricContext((StreamingRuntimeContext) getRuntimeContext()); + } + @Override + public SeaTunnelRow map(SeaTunnelRow row) throws Exception { + if (Objects.isNull(row)) { + return null; + } + SeaTunnelRow rowResult = ((SeaTunnelMapTransform) transform).map(row); + if (rowResult != null) { + String tableId = pluginOutput == null ? rowResult.getTableId() : pluginOutput; + updateMetric(metricName, tableId, metricsContext); + } + return rowResult; + } + } + + public static class ArrayFlatMap extends RichFlatMapFunction { + private MetricsContext metricsContext; private SeaTunnelTransform transform; + private final String metricName; + private final String pluginOutput; - public ArrayFlatMap(SeaTunnelTransform transform) { + public ArrayFlatMap(SeaTunnelTransform transform, String metricName, String pluginOutput) { this.transform = transform; + this.metricName = metricName; + this.pluginOutput = pluginOutput; + } + + @Override + public void open(Configuration parameters) throws Exception { + metricsContext = new FlinkMetricContext((StreamingRuntimeContext) getRuntimeContext()); } @Override @@ -177,9 +229,25 @@ public void flatMap(SeaTunnelRow row, Collector collector) { ((SeaTunnelFlatMapTransform) transform).flatMap(row); if (CollectionUtils.isNotEmpty(rows)) { for (SeaTunnelRow rowResult : rows) { + String tableId = pluginOutput == null ? rowResult.getTableId() : pluginOutput; + updateMetric(metricName, tableId, metricsContext); collector.collect(rowResult); } } } } + + private static void updateMetric( + String metricName, String tableId, MetricsContext metricsContext) { + StringBuilder metricNameBuilder = new StringBuilder(); + metricNameBuilder + .append(MetricNames.TRANSFORM_OUTPUT_COUNT) + .append("#") + .append(metricName) + .append("#") + .append(tableId); + if (metricsContext != null) { + metricsContext.counter(metricNameBuilder.toString()).inc(); + } + } } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index bb7fe67a874..549d2ad2224 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -43,6 +43,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import com.hazelcast.client.config.ClientConfig; @@ -56,6 +57,8 @@ import java.time.LocalDateTime; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -229,7 +232,8 @@ public void execute() throws CommandExecuteException { } finally { if (jobMetricsSummary != null) { // print job statistics information when job finished - log.info( + StringBuilder logMessage = new StringBuilder(); + logMessage.append( StringFormatUtils.formatTable( "Job Statistic Information", "Start Time", @@ -247,6 +251,28 @@ public void execute() throws CommandExecuteException { "Total Failed Count", jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())); + + if (MapUtils.isNotEmpty(jobMetricsSummary.getTransformMetricsMaps())) { + jobMetricsSummary + .getTransformMetricsMaps() + .forEach( + (tableName, metrics) -> { + String[] transformInfos = + new String[metrics.entrySet().size() * 2 + 2]; + transformInfos[0] = tableName + " Information"; + int index = 0; + for (Map.Entry entry : metrics.entrySet()) { + transformInfos[++index] = entry.getKey(); + transformInfos[++index] = + String.valueOf(entry.getValue()); + } + if (Objects.nonNull(transformInfos)) { + logMessage.append( + StringFormatUtils.formatTable(transformInfos)); + } + }); + } + log.info("{}", logMessage); } closeClient(); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java index c53b3174b19..5ef5e7f23ab 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java @@ -132,7 +132,25 @@ public void multiTableMetrics() { equalTo(String.valueOf(dataSize * 10))) .body( "metrics.TableSinkWriteBytes.'fake.public.table2'", - equalTo(String.valueOf(dataSize * 5))); + equalTo(String.valueOf(dataSize * 5))) + .body( + "metrics.TransformOutputCount.'Transform[0]-Sql'.'fake3'", + equalTo("10")) + .body( + "metrics.TransformOutputCount.'Transform[1]-Sql'.'fake4'", + equalTo("10")) + .body( + "metrics.TransformOutputCount.'Transform[2]-Sql'.'fake5'", + equalTo("10")) + .body( + "metrics.TransformOutputCount.'Transform[3]-Sql'.'fake6'", + equalTo("5")) + .body( + "metrics.TransformOutputCount.'Transform[4]-Sql'.'fake7'", + equalTo("5")) + .body( + "metrics.TransformOutputCount.'Transform[5]-Sql'.'fake8'", + equalTo("5")); Assertions.assertTrue( Double.parseDouble(response.path("metrics.SourceReceivedQPS")) > 0 @@ -178,6 +196,35 @@ public void multiTableMetrics() { && Double.parseDouble( response.path( "metrics.TableSinkWriteBytesPerSeconds.'fake.public.table2'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TransformOutputCount.'Transform[0]-Sql'.'fake3'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TransformOutputCount.'Transform[1]-Sql'" + + ".'fake4'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TransformOutputCount.'Transform[2]-Sql'" + + ".'fake5'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TransformOutputCount.'Transform[3]-Sql'" + + ".'fake6'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TransformOutputCount.'Transform[4]-Sql'" + + ".'fake7'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TransformOutputCount.'Transform[5]-Sql'" + + ".'fake8'")) > 0); }); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf index d453bf9ed62..dc3bd95fd3f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -116,13 +116,44 @@ source { } transform { + Sql { + plugin_input = "fake1" + plugin_output = "fake3" + query = "select * from fake1" + } + Sql { + plugin_input = "fake3" + plugin_output = "fake4" + query = "select * from fake3" + } + Sql { + plugin_input = "fake4" + plugin_output = "fake5" + query = "select * from fake4" + } + Sql { + plugin_input = "fake2" + plugin_output = "fake6" + query = "select * from fake2" + } + Sql { + plugin_input = "fake6" + plugin_output = "fake7" + query = "select * from fake6" + } + Sql { + plugin_input = "fake7" + plugin_output = "fake8" + query = "select * from fake7" + } + } sink { console { - plugin_input = "fake1" + plugin_input = "fake5" } console { - plugin_input = "fake2" + plugin_input = "fake8" } -} +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java index 2421829ea9e..0306a069064 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient; import org.apache.seatunnel.engine.client.util.ContentFormatUtil; import org.apache.seatunnel.engine.common.Constant; @@ -41,9 +42,19 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.collections4.map.HashedMap; + import lombok.NonNull; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.seatunnel.api.common.metrics.MetricNames.TRANSFORM_OUTPUT_COUNT; public class JobClient { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -154,6 +165,7 @@ public JobDAGInfo getJobInfo(Long jobId) { public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) { long sourceReadCount = 0L; long sinkWriteCount = 0L; + Map> transformCountMap = new HashMap<>(); String jobMetrics = getJobMetrics(jobId); try { JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics); @@ -165,11 +177,98 @@ public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) { sourceReadCount += sourceReader.get("value").asLong(); sinkWriteCount += sinkWriter.get("value").asLong(); } - return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount); + transformCountMap = extractTransformKeys(jsonNode); + return new JobMetricsRunner.JobMetricsSummary( + sourceReadCount, sinkWriteCount, transformCountMap); // Add NullPointerException because of metrics information can be empty like {} } catch (JsonProcessingException | NullPointerException e) { - return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, sinkWriteCount); + return new JobMetricsRunner.JobMetricsSummary( + sourceReadCount, sinkWriteCount, transformCountMap); + } + } + + private Map> extractTransformKeys(JsonNode rootNode) { + Map> transformCountMap = new TreeMap<>(); + Map>> transformMetricsMaps = new HashedMap(); + rootNode.fieldNames() + .forEachRemaining( + metricName -> { + if (metricName.contains("Transform")) { + processTransformMetric(transformMetricsMaps, metricName, rootNode); + } + }); + transformMetricsMaps.forEach( + (metricName, metricMap) -> { + metricMap.forEach( + (tableName, pathMap) -> { + transformCountMap.put(tableName, aggregateTreeMap(pathMap, false)); + }); + }); + return transformCountMap; + } + + private Map aggregateTreeMap(Map inputMap, boolean isRate) { + return isRate + ? inputMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToDouble( + node -> + node.path("value") + .asDouble()) + .sum(), + (v1, v2) -> v1, + TreeMap::new)) + : inputMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToLong( + node -> node.path("value").asLong()) + .sum(), + (v1, v2) -> v1, + TreeMap::new)); + } + + private void processTransformMetric( + Map>> transformMetricsMaps, + String metricName, + JsonNode jobMetricsStr) { + if (metricName.contains(TRANSFORM_OUTPUT_COUNT)) { + processTransformMetric( + transformMetricsMaps, TRANSFORM_OUTPUT_COUNT, metricName, jobMetricsStr); + } + } + + private void processTransformMetric( + Map>> transformMetricsMaps, + String key, + String metricName, + JsonNode jobMetricsStr) { + Map> transformMetricsMap = transformMetricsMaps.get(key); + if (MapUtils.isEmpty(transformMetricsMap)) { + transformMetricsMap = new TreeMap<>(); + transformMetricsMaps.put(key, transformMetricsMap); + } + String tableName = TablePath.of(metricName.split("#")[1]).getFullName(); + String path = metricName.split("#")[2]; + Map pathMap; + if (transformMetricsMap.containsKey(tableName)) { + pathMap = transformMetricsMap.get(tableName); + } else { + pathMap = new TreeMap<>(); } + pathMap.put(path, jobMetricsStr.get(metricName)); + transformMetricsMap.put(tableName, pathMap); } public List getCheckpointData(Long jobId) { diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java index f413a5bfc52..829f4c1160c 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -21,12 +21,17 @@ import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.commons.collections4.MapUtils; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; @Slf4j public class JobMetricsRunner implements Runnable { @@ -35,6 +40,7 @@ public class JobMetricsRunner implements Runnable { private LocalDateTime lastRunTime = LocalDateTime.now(); private Long lastReadCount = 0L; private Long lastWriteCount = 0L; + private Map transformCountMap = new HashMap<>(); public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) { this.seaTunnelClient = seaTunnelClient; @@ -50,7 +56,8 @@ public void run() { long seconds = Duration.between(lastRunTime, now).getSeconds(); long averageRead = (jobMetricsSummary.getSourceReadCount() - lastReadCount) / seconds; long averageWrite = (jobMetricsSummary.getSinkWriteCount() - lastWriteCount) / seconds; - log.info( + StringBuilder logMessage = new StringBuilder(); + logMessage.append( StringFormatUtils.formatTable( "Job Progress Information", "Job Id", @@ -69,6 +76,28 @@ public void run() { "Current Statistic Time", DateTimeUtils.toString( now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))); + + if (MapUtils.isNotEmpty(jobMetricsSummary.getTransformMetricsMaps())) { + jobMetricsSummary + .getTransformMetricsMaps() + .forEach( + (tableName, metrics) -> { + String[] transformInfos = + new String[metrics.entrySet().size() * 2 + 1]; + transformInfos[0] = tableName + "Transform"; + int index = 0; + for (Map.Entry entry : metrics.entrySet()) { + transformInfos[++index] = entry.getKey(); + transformInfos[++index] = String.valueOf(entry.getValue()); + } + if (Objects.nonNull(transformInfos)) { + logMessage.append( + StringFormatUtils.formatTable(transformInfos)); + } + }); + } + + log.info("{}", logMessage); lastRunTime = now; lastReadCount = jobMetricsSummary.getSourceReadCount(); lastWriteCount = jobMetricsSummary.getSinkWriteCount(); @@ -82,5 +111,6 @@ public void run() { public static class JobMetricsSummary { private long sourceReadCount; private long sinkWriteCount; + private Map> transformMetricsMaps; } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java index 2d594eb0d62..59f53b627e5 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java @@ -28,6 +28,7 @@ public class TransformAction extends AbstractAction { private final SeaTunnelTransform transform; + private final String pluginOutput; public TransformAction( long id, @@ -35,9 +36,11 @@ public TransformAction( @NonNull List upstreams, @NonNull SeaTunnelTransform transform, @NonNull Set jarUrls, - @NonNull Set connectorJarIdentifiers) { + @NonNull Set connectorJarIdentifiers, + String pluginOutput) { super(id, name, upstreams, jarUrls, connectorJarIdentifiers); this.transform = transform; + this.pluginOutput = pluginOutput; } public TransformAction( @@ -45,12 +48,18 @@ public TransformAction( @NonNull String name, @NonNull SeaTunnelTransform transform, @NonNull Set jarUrls, - @NonNull Set connectorJarIdentifiers) { + @NonNull Set connectorJarIdentifiers, + String pluginOutput) { super(id, name, jarUrls, connectorJarIdentifiers); this.transform = transform; + this.pluginOutput = pluginOutput; } public SeaTunnelTransform getTransform() { return transform; } + + public String getPluginOutput() { + return pluginOutput; + } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java index ae3c64dcef1..747a7e38af1 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java @@ -30,6 +30,8 @@ public class TransformChainAction extends AbstractAction { private static final long serialVersionUID = -340174711145367535L; private final List> transforms; + private final List pluginOutputs; + private final List transformNames; public TransformChainAction( long id, @@ -37,9 +39,13 @@ public TransformChainAction( @NonNull List upstreams, @NonNull Set jarUrls, @NonNull Set connectorJarIdentifiers, - @NonNull List> transforms) { + @NonNull List> transforms, + List pluginOutputs, + List transformNames) { super(id, name, upstreams, jarUrls, connectorJarIdentifiers); this.transforms = transforms; + this.pluginOutputs = pluginOutputs; + this.transformNames = transformNames; } public TransformChainAction( @@ -47,12 +53,24 @@ public TransformChainAction( @NonNull String name, @NonNull Set jarUrls, @NonNull Set connectorJarIdentifiers, - @NonNull List> transforms) { + @NonNull List> transforms, + List pluginOutputs, + List transformNames) { super(id, name, jarUrls, connectorJarIdentifiers); this.transforms = transforms; + this.pluginOutputs = pluginOutputs; + this.transformNames = transformNames; } public List> getTransforms() { return transforms; } + + public List getPluginOutputs() { + return pluginOutputs; + } + + public List getTransformNames() { + return transformNames; + } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 903db02f804..89c87265256 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -471,7 +471,8 @@ private void parseTransform( new ArrayList<>(inputActions), transform, jarUrls, - new HashSet<>()); + new HashSet<>(), + readonlyConfig.get(CommonOptions.PLUGIN_OUTPUT)); transformAction.setParallelism(parallelism); List> actions = new ArrayList<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java index 3755477f45a..f2d01a0d238 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java @@ -125,7 +125,8 @@ public static Action recreateAction(Action action, Long id, int parallelism) { action.getName(), ((TransformAction) action).getTransform(), action.getJarUrls(), - action.getConnectorJarIdentifiers()); + action.getConnectorJarIdentifiers(), + ((TransformAction) action).getPluginOutput()); } else if (action instanceof TransformChainAction) { newAction = new TransformChainAction( @@ -133,7 +134,9 @@ public static Action recreateAction(Action action, Long id, int parallelism) { action.getName(), action.getJarUrls(), action.getConnectorJarIdentifiers(), - ((TransformChainAction) action).getTransforms()); + ((TransformChainAction) action).getTransforms(), + ((TransformChainAction) action).getPluginOutputs(), + ((TransformChainAction) action).getTransformNames()); } else { throw new UnknownActionException(action); } @@ -368,6 +371,7 @@ private void fillChainedTransformExecutionVertex( if (transformChainedVertices.size() > 0) { long newVertexId = idGenerator.getNextId(); List transforms = new ArrayList<>(transformChainedVertices.size()); + List pluginOutputs = new ArrayList<>(transformChainedVertices.size()); List names = new ArrayList<>(transformChainedVertices.size()); Set jars = new HashSet<>(); Set identifiers = new HashSet<>(); @@ -382,15 +386,23 @@ private void fillChainedTransformExecutionVertex( .forEach( action -> { transforms.add(action.getTransform()); + pluginOutputs.add(action.getPluginOutput()); jars.addAll(action.getJarUrls()); identifiers.addAll(action.getConnectorJarIdentifiers()); names.add(action.getName()); }); String transformChainActionName = String.format("TransformChain[%s]", String.join("->", names)); + List transformNames = names; TransformChainAction transformChainAction = new TransformChainAction( - newVertexId, transformChainActionName, jars, identifiers, transforms); + newVertexId, + transformChainActionName, + jars, + identifiers, + transforms, + pluginOutputs, + transformNames); transformChainAction.setParallelism(currentVertex.getAction().getParallelism()); ExecutionVertex executionVertex = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java index fc99feb2649..f02a97b4380 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java @@ -51,6 +51,8 @@ import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import org.apache.seatunnel.engine.server.utils.RestUtil; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.collections4.map.HashedMap; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -73,6 +75,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -86,6 +89,7 @@ import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.TRANSFORM_OUTPUT_COUNT; import static org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_BYTES; import static org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_BYTES_PER_SECONDS; import static org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_COUNT; @@ -268,7 +272,7 @@ private Map getJobMetrics(String jobMetrics) { new HashMap<>(), // Source Received Bytes Per Second new HashMap<>() // Sink Write Bytes Per Second }; - + Map>> transformMetricsMaps = new HashedMap(); try { JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); @@ -276,6 +280,11 @@ private Map getJobMetrics(String jobMetrics) { .fieldNames() .forEachRemaining( metricName -> { + if (metricName.contains("Transform")) { + processTransformMetric( + transformMetricsMaps, metricName, jobMetricsStr); + return; + } if (metricName.contains("#")) { String tableName = TablePath.of(metricName.split("#")[1]).getFullName(); @@ -308,9 +317,53 @@ private Map getJobMetrics(String jobMetrics) { ArrayUtils.addAll(countMetricsNames, rateMetricsNames), metricsSums.length); + transformMetricsMaps.forEach( + (metricName, metricMap) -> { + Map transformMetricsMap = new TreeMap<>(); + metricMap.forEach( + (tableName, pathMap) -> { + transformMetricsMap.put( + tableName, aggregateTreeMap(pathMap, false)); + }); + + metricsMap.put(metricName, transformMetricsMap); + }); + return metricsMap; } + private void processTransformMetric( + Map>> transformMetricsMaps, + String metricName, + JsonNode jobMetricsStr) { + if (metricName.contains(TRANSFORM_OUTPUT_COUNT)) { + processTransformMetric( + transformMetricsMaps, TRANSFORM_OUTPUT_COUNT, metricName, jobMetricsStr); + } + } + + private void processTransformMetric( + Map>> transformMetricsMaps, + String key, + String metricName, + JsonNode jobMetricsStr) { + Map> transformMetricsMap = transformMetricsMaps.get(key); + if (MapUtils.isEmpty(transformMetricsMap)) { + transformMetricsMap = new TreeMap<>(); + transformMetricsMaps.put(key, transformMetricsMap); + } + String tableName = TablePath.of(metricName.split("#")[1]).getFullName(); + String path = metricName.split("#")[2]; + Map pathMap; + if (transformMetricsMap.containsKey(tableName)) { + pathMap = transformMetricsMap.get(tableName); + } else { + pathMap = new TreeMap<>(); + } + pathMap.put(path, jobMetricsStr.get(metricName)); + transformMetricsMap.put(tableName, pathMap); + } + private void processMetric( String metricName, String tableName, @@ -415,6 +468,38 @@ private Map aggregateMap(Map inputMap, boolean .sum())); } + private Map aggregateTreeMap(Map inputMap, boolean isRate) { + return isRate + ? inputMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToDouble( + node -> + node.path("value") + .asDouble()) + .sum(), + (v1, v2) -> v1, + TreeMap::new)) + : inputMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToLong( + node -> node.path("value").asLong()) + .sum(), + (v1, v2) -> v1, + TreeMap::new)); + } + private JsonObject metricsToJsonObject(Map jobMetrics) { JsonObject members = new JsonObject(); jobMetrics.forEach( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index e04e12d69f8..27b93d52c2e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -237,7 +237,8 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex (TransformChainAction) f.getAction(), this, new SeaTunnelTransformCollector(flowLifeCycles), - completableFuture); + completableFuture, + this.getMetricsContext()); } else if (f.getAction() instanceof ShuffleAction) { ShuffleAction shuffleAction = (ShuffleAction) f.getAction(); HazelcastInstance hazelcastInstance = getExecutionContext().getInstance(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java index 8eaf1963585..afa43ece7f2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java @@ -17,8 +17,11 @@ package org.apache.seatunnel.engine.server.task.flow; +import org.apache.seatunnel.api.common.metrics.MetricNames; +import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.Collector; import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelMapTransform; @@ -48,17 +51,27 @@ public class TransformFlowLifeCycle extends ActionFlowLifeCycle private final List> transform; + private final List pluginOutputList; + + private final List transformNames; + private final Collector> collector; + private MetricsContext metricsContext; + public TransformFlowLifeCycle( TransformChainAction action, SeaTunnelTask runningTask, Collector> collector, - CompletableFuture completableFuture) { + CompletableFuture completableFuture, + MetricsContext metricsContext) { super(action, runningTask, completableFuture); this.action = action; this.transform = action.getTransforms(); + this.pluginOutputList = action.getPluginOutputs(); + this.transformNames = action.getTransformNames(); this.collector = collector; + this.metricsContext = metricsContext; } @Override @@ -136,8 +149,11 @@ public List transform(T inputData) { List dataList = new ArrayList<>(); dataList.add(inputData); - + int index = 0; for (SeaTunnelTransform transformer : transform) { + String pluginOutput = pluginOutputList.get(index); + String metricName = transformNames.get(index); + index++; List nextInputDataList = new ArrayList<>(); if (transformer instanceof SeaTunnelFlatMapTransform) { SeaTunnelFlatMapTransform transformDecorator = @@ -151,6 +167,15 @@ public List transform(T inputData) { outputDataArray); if (CollectionUtils.isNotEmpty(outputDataArray)) { nextInputDataList.addAll(outputDataArray); + for (T outputData : outputDataArray) { + if (outputData instanceof SeaTunnelRow) { + String tableId = + pluginOutput == null + ? ((SeaTunnelRow) outputData).getTableId() + : pluginOutput; + updateMetric(metricName, tableId); + } + } } } } else if (transformer instanceof SeaTunnelMapTransform) { @@ -168,19 +193,23 @@ public List transform(T inputData) { continue; } nextInputDataList.add(outputData); + if (outputData instanceof SeaTunnelRow) { + String tableId = + pluginOutput == null + ? ((SeaTunnelRow) outputData).getTableId() + : pluginOutput; + updateMetric(metricName, tableId); + } } } dataList = nextInputDataList; } - return dataList; } @Override - public void restoreState(List actionStateList) throws Exception { - // nothing - } + public void restoreState(List actionStateList) throws Exception {} @Override public void close() throws IOException { @@ -197,4 +226,17 @@ public void close() throws IOException { } super.close(); } + + private void updateMetric(String metricName, String tableId) { + StringBuilder metricNameBuilder = new StringBuilder(); + metricNameBuilder + .append(MetricNames.TRANSFORM_OUTPUT_COUNT) + .append("#") + .append(metricName) + .append("#") + .append(tableId); + if (metricsContext != null) { + metricsContext.counter(metricNameBuilder.toString()).inc(); + } + } } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java index 556f028ecf0..a5209308793 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java @@ -21,10 +21,17 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.StringFormatUtils; +import org.apache.commons.collections.MapUtils; import org.apache.flink.api.common.JobExecutionResult; import java.time.Duration; import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +import static org.apache.seatunnel.api.common.metrics.MetricNames.TRANSFORM_OUTPUT_COUNT; public final class FlinkJobMetricsSummary { @@ -82,25 +89,70 @@ public FlinkJobMetricsSummary build() { @Override public String toString() { - return StringFormatUtils.formatTable( - "Job Statistic Information", - "Start Time", - DateTimeUtils.toString(jobStartTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), - "End Time", - DateTimeUtils.toString(jobEndTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), - "Total Time(s)", - Duration.between(jobStartTime, jobEndTime).getSeconds(), - "Total Read Count", - jobExecutionResult - .getAllAccumulatorResults() - .get(MetricNames.SOURCE_RECEIVED_COUNT), - "Total Write Count", - jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_COUNT), - "Total Read Bytes", - jobExecutionResult - .getAllAccumulatorResults() - .get(MetricNames.SOURCE_RECEIVED_BYTES), - "Total Write Bytes", - jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_BYTES)); + Map> transformCountMap = new TreeMap<>(); + + // Transform metrics + jobExecutionResult.getAllAccumulatorResults().entrySet().stream() + .filter(entry -> entry.getKey().contains(TRANSFORM_OUTPUT_COUNT)) + .forEach( + (metric) -> { + String tableName = metric.getKey().split("#")[1]; + String path = metric.getKey().split("#")[2]; + if (transformCountMap.containsKey(tableName)) { + Map metricMap = transformCountMap.get(tableName); + metricMap.put(path, metric.getValue()); + } else { + Map metricMap = new HashMap<>(); + metricMap.put(path, metric.getValue()); + transformCountMap.put(tableName, metricMap); + } + }); + + StringBuilder logMessage = new StringBuilder(); + logMessage.append( + StringFormatUtils.formatTable( + "Job Statistic Information", + "Start Time", + DateTimeUtils.toString( + jobStartTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), + "End Time", + DateTimeUtils.toString( + jobEndTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), + "Total Time(s)", + Duration.between(jobStartTime, jobEndTime).getSeconds(), + "Total Read Count", + jobExecutionResult + .getAllAccumulatorResults() + .get(MetricNames.SOURCE_RECEIVED_COUNT), + "Total Write Count", + jobExecutionResult + .getAllAccumulatorResults() + .get(MetricNames.SINK_WRITE_COUNT), + "Total Read Bytes", + jobExecutionResult + .getAllAccumulatorResults() + .get(MetricNames.SOURCE_RECEIVED_BYTES), + "Total Write Bytes", + jobExecutionResult + .getAllAccumulatorResults() + .get(MetricNames.SINK_WRITE_BYTES))); + + if (MapUtils.isNotEmpty(transformCountMap)) { + transformCountMap.forEach( + (tableName, metrics) -> { + String[] transformInfos = new String[metrics.entrySet().size() * 2 + 2]; + transformInfos[0] = tableName + " Information"; + int index = 0; + for (Map.Entry entry : metrics.entrySet()) { + transformInfos[++index] = entry.getKey(); + transformInfos[++index] = String.valueOf(entry.getValue()); + } + if (Objects.nonNull(transformInfos)) { + logMessage.append(StringFormatUtils.formatTable(transformInfos)); + } + }); + } + + return logMessage.toString(); } }