From 5b94afaaf4e89a177996fc603d9b8c0ef9801086 Mon Sep 17 00:00:00 2001 From: Lin Liu <141371752+linliu-code@users.noreply.github.com> Date: Thu, 15 Feb 2024 16:38:29 -0800 Subject: [PATCH] [MINOR] Fix zookeeper session expiration bug (#10671) --- .../TestDFSHoodieTestSuiteWriterAdapter.java | 2 +- .../testsuite/TestFileDeltaInputWriter.java | 2 +- .../testsuite/job/TestHoodieTestSuiteJob.java | 3 +- .../reader/TestDFSAvroDeltaInputReader.java | 2 +- .../TestDFSHoodieDatasetInputReader.java | 3 +- .../callback/TestKafkaCallbackProvider.java | 17 ++++++-- .../HoodieDeltaStreamerTestBase.java | 13 +++--- .../TestHoodieDeltaStreamer.java | 4 +- ...oodieDeltaStreamerSchemaEvolutionBase.java | 1 - .../schema/TestFilebasedSchemaProvider.java | 2 +- .../sources/BaseTestKafkaSource.java | 14 +++---- .../sources/TestAvroKafkaSource.java | 17 +++++--- .../sources/TestSqlFileBasedSource.java | 40 ++++++++++++------- .../hudi/utilities/sources/TestSqlSource.java | 2 +- .../debezium/TestAbstractDebeziumSource.java | 18 +++++++-- .../sources/helpers/TestKafkaOffsetGen.java | 14 +++---- .../testutils/UtilitiesTestBase.java | 11 ++++- .../AbstractCloudObjectsSourceTestBase.java | 2 +- .../TestSqlFileBasedTransformer.java | 36 +++++++++-------- 19 files changed, 129 insertions(+), 74 deletions(-) diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java index 70430328553f2..f2ec458bf2d05 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java @@ -69,7 +69,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java index 4f99292b3fd20..d8e54984367a4 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java @@ -63,7 +63,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java index 087ffb8e400f5..9a4a2eee619a4 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java @@ -49,6 +49,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.UUID; import java.util.stream.Stream; @@ -134,7 +135,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java index 089a9d9fb5591..8f93a82865a1f 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java @@ -48,7 +48,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java index 3a11de9f0b531..40e1f58698d71 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.HashSet; import java.util.List; @@ -55,7 +56,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java index 70897aecf30f1..e2c3c86cd5bf5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java @@ -30,9 +30,12 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.List; import java.util.UUID; @@ -43,19 +46,27 @@ public class TestKafkaCallbackProvider extends UtilitiesTestBase { private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); - private static KafkaTestUtils testUtils; + private KafkaTestUtils testUtils; @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); + } + + @BeforeEach + public void setup() { testUtils = new KafkaTestUtils(); testUtils.setup(); } + @AfterEach + public void tearDown() { + testUtils.teardown(); + } + @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); - testUtils.teardown(); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index c4b3ba265d671..58b5d79883e08 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -51,6 +51,7 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; @@ -130,14 +131,15 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static final String HOODIE_CONF_PARAM = "--hoodie-conf"; static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table"; static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; - public static KafkaTestUtils testUtils; protected static String topicName; protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); protected static int testNum = 1; Map hudiOpts = new HashMap<>(); + public KafkaTestUtils testUtils; - protected static void prepareTestSetup() throws IOException { + @BeforeEach + protected void prepareTestSetup() throws IOException { PARQUET_SOURCE_ROOT = basePath + "/parquetFiles"; ORC_SOURCE_ROOT = basePath + "/orcFiles"; JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles"; @@ -245,16 +247,15 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath) @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(false, true, false); - prepareTestSetup(); } @AfterAll - public static void tearDown() { - cleanupKafkaTestUtils(); + public static void tearDown() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } - public static void cleanupKafkaTestUtils() { + @AfterEach + public void cleanupKafkaTestUtils() { if (testUtils != null) { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 16a523d5ac1fe..7835f6bfac964 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1716,11 +1716,11 @@ public void testDistributedTestDataSource() { assertEquals(1000, c); } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) { + private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) { prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2); } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) { + private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) { if (createTopic) { try { testUtils.createTopic(topicName, numPartitions); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java index a0ba7d4a40191..43ac68e3736b4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java @@ -129,7 +129,6 @@ public void teardown() throws Exception { @AfterAll static void teardownAll() { defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); - HoodieDeltaStreamerTestBase.cleanupKafkaTestUtils(); } protected HoodieStreamer deltaStreamer; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java index 389282ddcdb79..945ce6f774a86 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java @@ -51,7 +51,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanUpUtilitiesTestServices() { + public static void cleanUpUtilitiesTestServices() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index f340120ca8db5..b5cbf2738f650 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -38,8 +38,8 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -58,20 +58,20 @@ */ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { protected static final String TEST_TOPIC_PREFIX = "hoodie_test_"; - protected static KafkaTestUtils testUtils; protected final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); protected SchemaProvider schemaProvider; + protected KafkaTestUtils testUtils; - @BeforeAll - public static void initClass() { + @BeforeEach + public void initClass() { testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterAll - public static void cleanupClass() { + @AfterEach + public void cleanupClass() { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index 3daa95055380e..558181f42586e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -45,8 +45,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -68,8 +69,6 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_"; - protected static KafkaTestUtils testUtils; - protected static HoodieTestDataGenerator dataGen; protected static String SCHEMA_PATH = "/tmp/schema_file.avsc"; @@ -78,15 +77,21 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { protected SchemaProvider schemaProvider; + protected KafkaTestUtils testUtils; + @BeforeAll public static void initClass() { - testUtils = new KafkaTestUtils(); dataGen = new HoodieTestDataGenerator(0xDEED); + } + + @BeforeEach + public void setup() { + testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterAll - public static void cleanupClass() { + @AfterEach + public void tearDown() { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java index c718e7a12e8d4..3f106fce994cc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java @@ -28,7 +28,6 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.AnalysisException; @@ -64,17 +63,10 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(false, true, false); - FileSystem fs = UtilitiesTestBase.fs; - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-based-source.sql", fs, - UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-based-source-invalid-table.sql", fs, - UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql"); } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } @@ -113,7 +105,11 @@ private void generateTestTable(String filename, String instantTime, int n) throw * @throws IOException */ @Test - public void testSqlFileBasedSourceAvroFormat() { + public void testSqlFileBasedSourceAvroFormat() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -136,7 +132,11 @@ public void testSqlFileBasedSourceAvroFormat() { * @throws IOException */ @Test - public void testSqlFileBasedSourceRowFormat() { + public void testSqlFileBasedSourceRowFormat() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -154,7 +154,11 @@ public void testSqlFileBasedSourceRowFormat() { * @throws IOException */ @Test - public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() { + public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -171,7 +175,11 @@ public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() { * @throws IOException */ @Test - public void testSqlFileBasedSourceInvalidTable() { + public void testSqlFileBasedSourceInvalidTable() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source-invalid-table.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -182,7 +190,11 @@ public void testSqlFileBasedSourceInvalidTable() { } @Test - public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() { + public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); props.setProperty(sqlFileSourceConfigEmitChkPointConf, "true"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java index 37ab549ea76e1..64578f3bae368 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java @@ -64,7 +64,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java index e6aa9d8862eec..c9f46144e96ac 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java @@ -39,11 +39,14 @@ import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.UUID; import java.util.stream.Stream; @@ -57,19 +60,28 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase { private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); private final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); - private static KafkaTestUtils testUtils; + private KafkaTestUtils testUtils; @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); + } + + @BeforeEach + public void setUpKafkaTestUtils() { testUtils = new KafkaTestUtils(); testUtils.setup(); } + @AfterEach + public void tearDownKafkaTestUtils() { + testUtils.teardown(); + testUtils = null; + } + @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); - testUtils.teardown(); } private TypedProperties createPropsForJsonSource() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index e3d2ec5a60287..6ad6a4c09dbf5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -31,8 +31,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.apache.spark.streaming.kafka010.OffsetRange; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.UUID; @@ -49,17 +49,17 @@ public class TestKafkaOffsetGen { private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); - private static KafkaTestUtils testUtils; private HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); + private KafkaTestUtils testUtils; - @BeforeAll - public static void setup() throws Exception { + @BeforeEach + public void setup() throws Exception { testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterAll - public static void teardown() throws Exception { + @AfterEach + public void teardown() throws Exception { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 24f645c404acf..0406ccddc4a74 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -164,7 +164,12 @@ public static void initTestServices(boolean needsHdfs, boolean needsHive, boolea } @AfterAll - public static void cleanUpUtilitiesTestServices() { + public static void cleanUpUtilitiesTestServices() throws IOException { + if (fs != null) { + fs.delete(new Path(basePath), true); + fs.close(); + fs = null; + } if (hdfsTestService != null) { hdfsTestService.stop(); hdfsTestService = null; @@ -197,6 +202,10 @@ public static void cleanUpUtilitiesTestServices() { @BeforeEach public void setup() throws Exception { TestDataSource.initDataGen(); + // This prevents test methods from using existing files or folders. + if (fs != null) { + fs.delete(new Path(basePath), true); + } } @AfterEach diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java index bdb6c85ce72b5..11a00ebeb2cf2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java @@ -58,7 +58,7 @@ public static void initClass() throws Exception { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java index b3cbe1d6108fa..1b0cc7f52a6d9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -51,22 +52,10 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-transformer.sql", - UtilitiesTestBase.fs, - UtilitiesTestBase.basePath + "/sql-file-transformer.sql"); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-transformer-invalid.sql", - UtilitiesTestBase.fs, - UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql"); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-transformer-empty.sql", - UtilitiesTestBase.fs, - UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql"); } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } @@ -106,7 +95,12 @@ public void testSqlFileBasedTransformerIncorrectConfig() { } @Test - public void testSqlFileBasedTransformerInvalidSQL() { + public void testSqlFileBasedTransformerInvalidSQL() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-transformer-invalid.sql", + UtilitiesTestBase.fs, + UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql"); + // Test if the SQL file based transformer works as expected for the invalid SQL statements. props.setProperty( "hoodie.deltastreamer.transformer.sql.file", @@ -117,7 +111,12 @@ public void testSqlFileBasedTransformerInvalidSQL() { } @Test - public void testSqlFileBasedTransformerEmptyDataset() { + public void testSqlFileBasedTransformerEmptyDataset() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-transformer-empty.sql", + UtilitiesTestBase.fs, + UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql"); + // Test if the SQL file based transformer works as expected for the empty SQL statements. props.setProperty( "hoodie.deltastreamer.transformer.sql.file", @@ -129,7 +128,12 @@ public void testSqlFileBasedTransformerEmptyDataset() { } @Test - public void testSqlFileBasedTransformer() { + public void testSqlFileBasedTransformer() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-transformer.sql", + UtilitiesTestBase.fs, + UtilitiesTestBase.basePath + "/sql-file-transformer.sql"); + // Test if the SQL file based transformer works as expected for the correct input. props.setProperty( "hoodie.deltastreamer.transformer.sql.file",