From d8629fc01d4583fae806956bdf861ae9134554e4 Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 24 Nov 2023 15:38:01 +0800 Subject: [PATCH 1/4] Change System.out.println to log output. --- .../file/writer/OrcReadStrategyTest.java | 9 ++++++--- .../file/writer/ParquetReadStrategyTest.java | 15 +++++++++------ .../jdbc/catalog/psql/PostgresCatalogTest.java | 5 ++++- .../catalog/StarRocksCreateTableTest.java | 5 ++++- .../core/starter/flink/FlinkStarter.java | 5 ++++- .../core/starter/flink/FlinkStarter.java | 5 ++++- .../core/starter/spark/SparkStarter.java | 5 ++++- .../core/starter/spark/SparkStarter.java | 5 ++++- .../seatunnel/command/ClientExecuteCommand.java | 8 ++++---- .../engine/e2e/ClusterFaultToleranceIT.java | 2 +- .../apache/seatunnel/engine/e2e/ClusterIT.java | 8 ++++---- .../engine/client/SeaTunnelClientTest.java | 10 ++++++---- .../apache/seatunnel/engine/client/TestUtils.java | 2 +- .../checkpoint/CheckpointSerializeTest.java | 7 +++++-- .../engine/server/master/JobMetricsTest.java | 6 ++++-- 15 files changed, 64 insertions(+), 33 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java index 56fbaae386a..4c97c783ce9 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java @@ -29,6 +29,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + import java.io.File; import java.net.URL; import java.nio.file.Paths; @@ -37,6 +39,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; +@Slf4j public class OrcReadStrategyTest { @Test @@ -51,7 +54,7 @@ public void testOrcRead() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); orcReadStrategy.read(orcFilePath, "", testCollector); for (SeaTunnelRow row : testCollector.getRows()) { Assertions.assertEquals(row.getField(0).getClass(), Boolean.class); @@ -77,7 +80,7 @@ public void testOrcReadProjection() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, orcFilePath); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); orcReadStrategy.read(orcFilePath, "", testCollector); for (SeaTunnelRow row : testCollector.getRows()) { Assertions.assertEquals(row.getField(0).getClass(), Byte.class); @@ -95,7 +98,7 @@ public List getRows() { @Override public void collect(SeaTunnelRow record) { - System.out.println(record); + log.info(record.toString()); rows.add(record); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java index 1c36a914531..ffac80407b2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java @@ -29,6 +29,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + import java.io.File; import java.net.URL; import java.nio.file.Paths; @@ -39,6 +41,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; +@Slf4j public class ParquetReadStrategyTest { @Test public void testParquetRead1() throws Exception { @@ -51,7 +54,7 @@ public void testParquetRead1() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); TestCollector testCollector = new TestCollector(); parquetReadStrategy.read(path, "", testCollector); } @@ -67,7 +70,7 @@ public void testParquetRead2() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); TestCollector testCollector = new TestCollector(); parquetReadStrategy.read(path, "", testCollector); } @@ -83,7 +86,7 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); int index = seaTunnelRowTypeInfo.indexOf("c_timestamp"); TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai"); TimeZone.setDefault(tz1); @@ -119,7 +122,7 @@ public void testParquetReadProjection1() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); TestCollector testCollector = new TestCollector(); parquetReadStrategy.read(path, "", testCollector); List rows = testCollector.getRows(); @@ -149,7 +152,7 @@ public void testParquetReadProjection2() throws Exception { SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path); Assertions.assertNotNull(seaTunnelRowTypeInfo); - System.out.println(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); TestCollector testCollector = new TestCollector(); parquetReadStrategy.read(path, "", testCollector); } @@ -164,7 +167,7 @@ public List getRows() { @Override public void collect(SeaTunnelRow record) { - System.out.println(record); + log.info(record.toString()); rows.add(record); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java index 6ef4d9e6548..c04c1941b0b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java @@ -25,7 +25,10 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + @Disabled("Please Test it in your local environment") +@Slf4j class PostgresCatalogTest { @Test @@ -51,7 +54,7 @@ void testCatalog() { CatalogTable table = catalog.getTable(TablePath.of("st_test", "public", "all_types_table_02")); - System.out.println("find table: " + table); + log.info("find table: " + table); catalog.createTable( new TablePath("liulitest", "public", "all_types_table_02"), table, false); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java index 6298921c2da..6a8be734e70 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java @@ -29,10 +29,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; +@Slf4j public class StarRocksCreateTableTest { @Test @@ -71,7 +74,7 @@ public void test() { .columns(columns) .build()); - System.out.println(result); + log.info(result); } @Test diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index 5dc1d32cef5..32cc16969f8 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -23,11 +23,14 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.List; import java.util.Objects; /** The SeaTunnel flink starter, used to generate the final flink job execute command. */ +@Slf4j public class FlinkStarter implements Starter { private static final String APP_NAME = SeaTunnelFlink.class.getName(); public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName(); @@ -46,7 +49,7 @@ public class FlinkStarter implements Starter { public static void main(String[] args) { FlinkStarter flinkStarter = new FlinkStarter(args); - System.out.println(String.join(" ", flinkStarter.buildCommands())); + log.info(String.join(" ", flinkStarter.buildCommands())); } @Override diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index 7373cb58ed5..4381cd6e410 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -23,11 +23,14 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.List; import java.util.Objects; /** The SeaTunnel flink starter, used to generate the final flink job execute command. */ +@Slf4j public class FlinkStarter implements Starter { private static final String APP_NAME = SeaTunnelFlink.class.getName(); public static final String APP_JAR_NAME = EngineType.FLINK15.getStarterJarName(); @@ -46,7 +49,7 @@ public class FlinkStarter implements Starter { public static void main(String[] args) { FlinkStarter flinkStarter = new FlinkStarter(args); - System.out.println(String.join(" ", flinkStarter.buildCommands())); + log.info(String.join(" ", flinkStarter.buildCommands())); } @Override diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index 1b8918976b4..582bd33de0d 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -37,6 +37,8 @@ import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -58,6 +60,7 @@ import java.util.stream.Stream; /** A Starter to generate spark-submit command for SeaTunnel job on spark. */ +@Slf4j public class SparkStarter implements Starter { /** original commandline args */ @@ -83,7 +86,7 @@ private SparkStarter(String[] args, SparkCommandArgs commandArgs) { public static void main(String[] args) throws IOException { SparkStarter starter = getInstance(args); List command = starter.buildCommands(); - System.out.println(String.join(" ", command)); + log.info(String.join(" ", command)); } /** diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index c33544873a7..ab17aa595e8 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -37,6 +37,8 @@ import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -58,6 +60,7 @@ import java.util.stream.Stream; /** A Starter to generate spark-submit command for SeaTunnel job on spark. */ +@Slf4j public class SparkStarter implements Starter { /** original commandline args */ @@ -83,7 +86,7 @@ private SparkStarter(String[] args, SparkCommandArgs commandArgs) { public static void main(String[] args) throws IOException { SparkStarter starter = getInstance(args); List command = starter.buildCommands(); - System.out.println(String.join(" ", command)); + log.info(String.join(" ", command)); } /** diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index eae7361ec72..6cbfd5e9d7c 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -95,16 +95,16 @@ public void execute() throws CommandExecuteException { engineClient = new SeaTunnelClient(clientConfig); if (clientCommandArgs.isListJob()) { String jobStatus = engineClient.getJobClient().listJobStatus(true); - System.out.println(jobStatus); + log.info(jobStatus); } else if (clientCommandArgs.isGetRunningJobMetrics()) { String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics(); - System.out.println(runningJobMetrics); + log.info(runningJobMetrics); } else if (null != clientCommandArgs.getJobId()) { String jobState = engineClient .getJobClient() .getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId())); - System.out.println(jobState); + log.info(jobState); } else if (null != clientCommandArgs.getCancelJobId()) { engineClient .getJobClient() @@ -114,7 +114,7 @@ public void execute() throws CommandExecuteException { engineClient .getJobClient() .getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId())); - System.out.println(jobMetrics); + log.info(jobMetrics); } else if (null != clientCommandArgs.getSavePointJobId()) { engineClient .getJobClient() diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index 8222faaa41f..f2f5d5c5c8f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -134,7 +134,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft()); Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir); - System.out.println(engineClient.getJobMetrics(clientJobProxy.getJobId())); + log.info(engineClient.getJobMetrics(clientJobProxy.getJobId())); log.warn("========================clean test resource===================="); } finally { if (engineClient != null) { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java index acb2fab9b89..5ff1b3005c4 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java @@ -65,12 +65,12 @@ public void getClusterHealthMetrics() { engineClient = new SeaTunnelClient(clientConfig); Map clusterHealthMetrics = engineClient.getClusterHealthMetrics(); - System.out.println( + log.info( "=====================================cluster metrics=================================================="); for (Map.Entry entry : clusterHealthMetrics.entrySet()) { - System.out.println(entry.getKey()); - System.out.println(entry.getValue()); - System.out.println( + log.info(entry.getKey()); + log.info(entry.getValue()); + log.info( "======================================================================================================"); } Assertions.assertEquals(2, clusterHealthMetrics.size()); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index ca7e1323539..ea76c28b480 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -45,6 +45,7 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.HazelcastInstanceFactory; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -57,6 +58,7 @@ import static org.awaitility.Awaitility.await; @DisabledOnOs(OS.WINDOWS) +@Slf4j public class SeaTunnelClientTest { private static SeaTunnelConfig SEATUNNEL_CONFIG = ConfigProvider.locateAndGetSeaTunnelConfig(); @@ -201,7 +203,7 @@ public void testGetJobMetrics() { String jobMetrics = jobClient.getJobMetrics(jobId); - System.out.println(jobMetrics); + log.info(jobMetrics); Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT)); Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS)); @@ -258,7 +260,7 @@ public void testGetRunningJobMetrics() throws ExecutionException, InterruptedExc .getJobStatus(jobId3) .equals("RUNNING"))); - System.out.println(jobClient.getRunningJobMetrics()); + log.info(jobClient.getRunningJobMetrics()); await().atMost(30000, TimeUnit.MILLISECONDS) .untilAsserted( @@ -334,10 +336,10 @@ public void testGetJobInfo() { .untilAsserted( () -> { Thread.sleep(1000); - System.out.println( + log.info( "======================job status:" + jobClient.getJobDetailStatus(jobId)); - System.out.println( + log.info( "======================list job status:" + jobClient.listJobStatus(true)); Assertions.assertTrue( diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java index cb1270623d4..10a55f17332 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java @@ -67,6 +67,6 @@ public void testContentFormatUtil() throws InterruptedException { return s1.getSubmitTime() > s2.getSubmitTime() ? -1 : 1; }); String r = ContentFormatUtil.format(statusDataList); - System.out.println(r); + log.info(r); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointSerializeTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointSerializeTest.java index 25c38de19ef..34c2ba45440 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointSerializeTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointSerializeTest.java @@ -28,11 +28,14 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.List; +@Slf4j public class CheckpointSerializeTest { @Test @@ -66,7 +69,7 @@ public void testPipelineStateDeserialize() throws IOException { DefaultSerializer defaultSerializer = new DefaultSerializer(); FakeSourceSplit split = defaultSerializer.deserialize(bytes); - System.out.println(split.getSplitId()); + log.info(String.valueOf(split.getSplitId())); } } @@ -84,7 +87,7 @@ public void testPipelineStateDeserialize() throws IOException { DefaultSerializer defaultSerializer = new DefaultSerializer(); FileSinkState fileSinkState = defaultSerializer.deserialize(bytes); - System.out.println(fileSinkState.getTransactionDir()); + log.info(fileSinkState.getTransactionDir()); } } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java index 413c4b23112..0446e9bb110 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.condition.OS; import com.hazelcast.internal.serialization.Data; +import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -48,6 +49,7 @@ @DisabledOnOs(OS.WINDOWS) @TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Slf4j class JobMetricsTest extends AbstractSeaTunnelServerTest { @Test @@ -112,7 +114,7 @@ public void testMetricsOnJobRestart() throws InterruptedException { Thread.sleep(10000); - System.out.println(coordinatorService.getJobMetrics(jobId3).toJsonString()); + log.info(coordinatorService.getJobMetrics(jobId3).toJsonString()); // start savePoint coordinatorService.savePoint(jobId3); @@ -137,7 +139,7 @@ public void testMetricsOnJobRestart() throws InterruptedException { Thread.sleep(20000); // check metrics JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3); - System.out.println(jobMetrics.toJsonString()); + log.info(jobMetrics.toJsonString()); assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(0).value()); assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(1).value()); assertTrue(40 < (Long) jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value()); From e22a4f526f7077d2bb109c51acdf8ec6f359dcea Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 24 Nov 2023 16:40:41 +0800 Subject: [PATCH 2/4] remove starter module change. --- .../starter/seatunnel/command/ClientExecuteCommand.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 6cbfd5e9d7c..eae7361ec72 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -95,16 +95,16 @@ public void execute() throws CommandExecuteException { engineClient = new SeaTunnelClient(clientConfig); if (clientCommandArgs.isListJob()) { String jobStatus = engineClient.getJobClient().listJobStatus(true); - log.info(jobStatus); + System.out.println(jobStatus); } else if (clientCommandArgs.isGetRunningJobMetrics()) { String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics(); - log.info(runningJobMetrics); + System.out.println(runningJobMetrics); } else if (null != clientCommandArgs.getJobId()) { String jobState = engineClient .getJobClient() .getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId())); - log.info(jobState); + System.out.println(jobState); } else if (null != clientCommandArgs.getCancelJobId()) { engineClient .getJobClient() @@ -114,7 +114,7 @@ public void execute() throws CommandExecuteException { engineClient .getJobClient() .getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId())); - log.info(jobMetrics); + System.out.println(jobMetrics); } else if (null != clientCommandArgs.getSavePointJobId()) { engineClient .getJobClient() From 2d80e9b818322d4b5f8276aed6dba008e51d7eef Mon Sep 17 00:00:00 2001 From: lightzhao Date: Fri, 24 Nov 2023 16:41:10 +0800 Subject: [PATCH 3/4] remove starter module change. --- .../apache/seatunnel/core/starter/flink/FlinkStarter.java | 5 +---- .../apache/seatunnel/core/starter/flink/FlinkStarter.java | 5 +---- .../apache/seatunnel/core/starter/spark/SparkStarter.java | 5 +---- .../apache/seatunnel/core/starter/spark/SparkStarter.java | 5 +---- 4 files changed, 4 insertions(+), 16 deletions(-) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index 32cc16969f8..5dc1d32cef5 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -23,14 +23,11 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; -import lombok.extern.slf4j.Slf4j; - import java.util.ArrayList; import java.util.List; import java.util.Objects; /** The SeaTunnel flink starter, used to generate the final flink job execute command. */ -@Slf4j public class FlinkStarter implements Starter { private static final String APP_NAME = SeaTunnelFlink.class.getName(); public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName(); @@ -49,7 +46,7 @@ public class FlinkStarter implements Starter { public static void main(String[] args) { FlinkStarter flinkStarter = new FlinkStarter(args); - log.info(String.join(" ", flinkStarter.buildCommands())); + System.out.println(String.join(" ", flinkStarter.buildCommands())); } @Override diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index 4381cd6e410..7373cb58ed5 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -23,14 +23,11 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; -import lombok.extern.slf4j.Slf4j; - import java.util.ArrayList; import java.util.List; import java.util.Objects; /** The SeaTunnel flink starter, used to generate the final flink job execute command. */ -@Slf4j public class FlinkStarter implements Starter { private static final String APP_NAME = SeaTunnelFlink.class.getName(); public static final String APP_JAR_NAME = EngineType.FLINK15.getStarterJarName(); @@ -49,7 +46,7 @@ public class FlinkStarter implements Starter { public static void main(String[] args) { FlinkStarter flinkStarter = new FlinkStarter(args); - log.info(String.join(" ", flinkStarter.buildCommands())); + System.out.println(String.join(" ", flinkStarter.buildCommands())); } @Override diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index 582bd33de0d..1b8918976b4 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -37,8 +37,6 @@ import org.apache.commons.lang3.StringUtils; -import lombok.extern.slf4j.Slf4j; - import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -60,7 +58,6 @@ import java.util.stream.Stream; /** A Starter to generate spark-submit command for SeaTunnel job on spark. */ -@Slf4j public class SparkStarter implements Starter { /** original commandline args */ @@ -86,7 +83,7 @@ private SparkStarter(String[] args, SparkCommandArgs commandArgs) { public static void main(String[] args) throws IOException { SparkStarter starter = getInstance(args); List command = starter.buildCommands(); - log.info(String.join(" ", command)); + System.out.println(String.join(" ", command)); } /** diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index ab17aa595e8..c33544873a7 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -37,8 +37,6 @@ import org.apache.commons.lang3.StringUtils; -import lombok.extern.slf4j.Slf4j; - import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -60,7 +58,6 @@ import java.util.stream.Stream; /** A Starter to generate spark-submit command for SeaTunnel job on spark. */ -@Slf4j public class SparkStarter implements Starter { /** original commandline args */ @@ -86,7 +83,7 @@ private SparkStarter(String[] args, SparkCommandArgs commandArgs) { public static void main(String[] args) throws IOException { SparkStarter starter = getInstance(args); List command = starter.buildCommands(); - log.info(String.join(" ", command)); + System.out.println(String.join(" ", command)); } /** From 65449cd2eacd798db6664fcebec54927fb0c790d Mon Sep 17 00:00:00 2001 From: lightzhao Date: Sat, 25 Nov 2023 14:02:03 +0800 Subject: [PATCH 4/4] update test. --- .../java/org/apache/seatunnel/engine/client/TestUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java index 10a55f17332..163d50025e0 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java @@ -23,9 +23,12 @@ import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.List; +@Slf4j public class TestUtils { public static String getResource(String confFile) { return System.getProperty("user.dir") + "/src/test/resources/" + confFile;