Skip to content

Commit

Permalink
disable inferring parallelism for a few tests that would fail or beco…
Browse files Browse the repository at this point in the history
…me flaky otherwise
  • Loading branch information
stevenzwu committed Aug 1, 2024
1 parent e8f6825 commit 3ead397
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;

import java.io.Serializable;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -35,6 +33,8 @@
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
Expand All @@ -59,6 +59,7 @@
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableExtension;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -79,6 +80,12 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
@TempDir protected Path temporaryFolder;

private static final InMemoryReporter REPORTER = InMemoryReporter.createWithRetainedMetrics();
public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false)
// disable inferring source parallelism
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);

@RegisterExtension
public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

@Timeout(value = 60)
public class TestIcebergSpeculativeExecutionSupport extends TestBase {
private static final int NUM_TASK_MANAGERS = 1;
private static final int NUM_TASK_SLOTS = 3;
Expand Down Expand Up @@ -169,6 +171,7 @@ private static Configuration configure() {

// Use FLIP-27 source
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);

// for speculative execution
configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);
Expand Down

0 comments on commit 3ead397

Please sign in to comment.