Skip to content

Commit

Permalink
[MINOR] Fix zookeeper session expiration bug (#10671)
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code authored and yihua committed Feb 27, 2024
1 parent cad5605 commit 5b94afa
Show file tree
Hide file tree
Showing 19 changed files with 129 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -134,7 +135,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;

Expand All @@ -55,7 +56,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.List;
import java.util.UUID;

Expand All @@ -43,19 +46,27 @@
public class TestKafkaCallbackProvider extends UtilitiesTestBase {
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();

private static KafkaTestUtils testUtils;
private KafkaTestUtils testUtils;

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
}

@BeforeEach
public void setup() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterEach
public void tearDown() {
testUtils.teardown();
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
testUtils.teardown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
Expand Down Expand Up @@ -130,14 +131,15 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
static final String HOODIE_CONF_PARAM = "--hoodie-conf";
static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
public static KafkaTestUtils testUtils;
protected static String topicName;
protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
protected static int testNum = 1;

Map<String, String> hudiOpts = new HashMap<>();
public KafkaTestUtils testUtils;

protected static void prepareTestSetup() throws IOException {
@BeforeEach
protected void prepareTestSetup() throws IOException {
PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
ORC_SOURCE_ROOT = basePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
Expand Down Expand Up @@ -245,16 +247,15 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath)
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, true, false);
prepareTestSetup();
}

@AfterAll
public static void tearDown() {
cleanupKafkaTestUtils();
public static void tearDown() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

public static void cleanupKafkaTestUtils() {
@AfterEach
public void cleanupKafkaTestUtils() {
if (testUtils != null) {
testUtils.teardown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1716,11 +1716,11 @@ public void testDistributedTestDataSource() {
assertEquals(1000, c);
}

private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
}

private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) {
private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) {
if (createTopic) {
try {
testUtils.createTopic(topicName, numPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void teardown() throws Exception {
@AfterAll
static void teardownAll() {
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
HoodieDeltaStreamerTestBase.cleanupKafkaTestUtils();
}

protected HoodieStreamer deltaStreamer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanUpUtilitiesTestServices() {
public static void cleanUpUtilitiesTestServices() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -58,20 +58,20 @@
*/
abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
protected static KafkaTestUtils testUtils;

protected final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);

protected SchemaProvider schemaProvider;
protected KafkaTestUtils testUtils;

@BeforeAll
public static void initClass() {
@BeforeEach
public void initClass() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterAll
public static void cleanupClass() {
@AfterEach
public void cleanupClass() {
testUtils.teardown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -68,8 +69,6 @@
public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";

protected static KafkaTestUtils testUtils;

protected static HoodieTestDataGenerator dataGen;

protected static String SCHEMA_PATH = "/tmp/schema_file.avsc";
Expand All @@ -78,15 +77,21 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {

protected SchemaProvider schemaProvider;

protected KafkaTestUtils testUtils;

@BeforeAll
public static void initClass() {
testUtils = new KafkaTestUtils();
dataGen = new HoodieTestDataGenerator(0xDEED);
}

@BeforeEach
public void setup() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterAll
public static void cleanupClass() {
@AfterEach
public void tearDown() {
testUtils.teardown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.AnalysisException;
Expand Down Expand Up @@ -64,17 +63,10 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, true, false);
FileSystem fs = UtilitiesTestBase.fs;
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source-invalid-table.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql");
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down Expand Up @@ -113,7 +105,11 @@ private void generateTestTable(String filename, String instantTime, int n) throw
* @throws IOException
*/
@Test
public void testSqlFileBasedSourceAvroFormat() {
public void testSqlFileBasedSourceAvroFormat() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source.sql");

props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
Expand All @@ -136,7 +132,11 @@ public void testSqlFileBasedSourceAvroFormat() {
* @throws IOException
*/
@Test
public void testSqlFileBasedSourceRowFormat() {
public void testSqlFileBasedSourceRowFormat() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source.sql");

props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
Expand All @@ -154,7 +154,11 @@ public void testSqlFileBasedSourceRowFormat() {
* @throws IOException
*/
@Test
public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() {
public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source.sql");

props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
Expand All @@ -171,7 +175,11 @@ public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() {
* @throws IOException
*/
@Test
public void testSqlFileBasedSourceInvalidTable() {
public void testSqlFileBasedSourceInvalidTable() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source-invalid-table.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql");

props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
Expand All @@ -182,7 +190,11 @@ public void testSqlFileBasedSourceInvalidTable() {
}

@Test
public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() {
public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(
"streamer-config/sql-file-based-source.sql", fs,
UtilitiesTestBase.basePath + "/sql-file-based-source.sql");

props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
props.setProperty(sqlFileSourceConfigEmitChkPointConf, "true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() {
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}

Expand Down
Loading

0 comments on commit 5b94afa

Please sign in to comment.