diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 74f37b87d8728..e39a473216186 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -248,6 +248,7 @@ public KafkaSyncTableAction build() { List args = new ArrayList<>( Arrays.asList( + "kafka_sync_table", "--warehouse", warehouse, "--database", @@ -265,14 +266,7 @@ public KafkaSyncTableAction build() { args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (KafkaSyncTableAction) - new KafkaSyncTableActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(KafkaSyncTableAction.class, args); } } @@ -299,7 +293,12 @@ public KafkaSyncDatabaseActionBuilder withMode(String mode) { public KafkaSyncDatabaseAction build() { List args = new ArrayList<>( - Arrays.asList("--warehouse", warehouse, "--database", database)); + Arrays.asList( + "kafka_sync_database", + "--warehouse", + warehouse, + "--database", + database)); args.addAll(mapToArgs("--kafka-conf", sourceConfig)); args.addAll(mapToArgs("--catalog-conf", catalogConfig)); @@ -316,10 +315,7 @@ public KafkaSyncDatabaseAction build() { new MultipleParameterToolAdapter( MultipleParameterTool.fromArgs( args.toArray(args.toArray(new String[0])))); - return (KafkaSyncDatabaseAction) - new KafkaSyncDatabaseActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(KafkaSyncDatabaseAction.class, args); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java index 18eb46e61c32f..b4c5eed770c46 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java @@ -18,14 +18,12 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.flink.action.MultipleParameterToolAdapter; import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; -import org.apache.flink.api.java.utils.MultipleParameterTool; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +110,7 @@ public MongoDBSyncTableAction build() { List args = new ArrayList<>( Arrays.asList( + "mongodb_sync_table", "--warehouse", warehouse, "--database", @@ -126,14 +125,7 @@ public MongoDBSyncTableAction build() { args.addAll(listToArgs("--partition-keys", partitionKeys)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (MongoDBSyncTableAction) - new MongoDBSyncTableActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(MongoDBSyncTableAction.class, args); } } @@ -164,7 +156,12 @@ public MongoDBSyncDatabaseActionBuilder withTypeMappingModes(String... typeMappi public MongoDBSyncDatabaseAction build() { List args = new ArrayList<>( - Arrays.asList("--warehouse", warehouse, "--database", database)); + Arrays.asList( + "mongodb_sync_database", + "--warehouse", + warehouse, + "--database", + database)); args.addAll(mapToArgs("--mongodb-conf", sourceConfig)); args.addAll(mapToArgs("--catalog-conf", catalogConfig)); @@ -175,14 +172,7 @@ public MongoDBSyncDatabaseAction build() { args.addAll(nullableToArgs("--including-tables", includingTables)); args.addAll(nullableToArgs("--excluding-tables", excludingTables)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (MongoDBSyncDatabaseAction) - new MongoDBSyncDatabaseActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(MongoDBSyncDatabaseAction.class, args); } } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index 9b3abd5ca7ba7..b9f91b3b4da0d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -18,10 +18,8 @@ package org.apache.paimon.flink.action.cdc.mysql; -import org.apache.paimon.flink.action.MultipleParameterToolAdapter; import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; -import org.apache.flink.api.java.utils.MultipleParameterTool; import org.junit.jupiter.api.AfterAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +111,7 @@ public MySqlSyncTableAction build() { List args = new ArrayList<>( Arrays.asList( + "mysql_sync_table", "--warehouse", warehouse, "--database", @@ -131,14 +130,7 @@ public MySqlSyncTableAction build() { args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); args.addAll(listToMultiArgs("--metadata-column", metadataColumns)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (MySqlSyncTableAction) - new MySqlSyncTableActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(MySqlSyncTableAction.class, args); } } @@ -153,7 +145,12 @@ public MySqlSyncDatabaseActionBuilder(Map mySqlConfig) { public MySqlSyncDatabaseAction build() { List args = new ArrayList<>( - Arrays.asList("--warehouse", warehouse, "--database", database)); + Arrays.asList( + "mysql_sync_database", + "--warehouse", + warehouse, + "--database", + database)); args.addAll(mapToArgs("--mysql-conf", sourceConfig)); args.addAll(mapToArgs("--catalog-conf", catalogConfig)); @@ -170,14 +167,7 @@ public MySqlSyncDatabaseAction build() { args.addAll(listToArgs("--type-mapping", typeMappingModes)); args.addAll(listToArgs("--metadata-column", metadataColumn)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (MySqlSyncDatabaseAction) - new MySqlSyncDatabaseActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(MySqlSyncDatabaseAction.class, args); } } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java index d593b789357e5..c9b447f523f3b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java @@ -18,13 +18,11 @@ package org.apache.paimon.flink.action.cdc.pulsar; -import org.apache.paimon.flink.action.MultipleParameterToolAdapter; import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; @@ -278,6 +276,7 @@ public PulsarSyncTableAction build() { List args = new ArrayList<>( Arrays.asList( + "pulsar_sync_table", "--warehouse", warehouse, "--database", @@ -295,14 +294,7 @@ public PulsarSyncTableAction build() { args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (PulsarSyncTableAction) - new PulsarSyncTableActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(PulsarSyncTableAction.class, args); } } @@ -329,7 +321,12 @@ public PulsarSyncDatabaseActionBuilder withMode(String mode) { public PulsarSyncDatabaseAction build() { List args = new ArrayList<>( - Arrays.asList("--warehouse", warehouse, "--database", database)); + Arrays.asList( + "pulsar_sync-database", + "--warehouse", + warehouse, + "--database", + database)); args.addAll(mapToArgs("--pulsar-conf", sourceConfig)); args.addAll(mapToArgs("--catalog-conf", catalogConfig)); @@ -342,14 +339,7 @@ public PulsarSyncDatabaseAction build() { args.addAll(listToArgs("--type-mapping", typeMappingModes)); - MultipleParameterToolAdapter params = - new MultipleParameterToolAdapter( - MultipleParameterTool.fromArgs( - args.toArray(args.toArray(new String[0])))); - return (PulsarSyncDatabaseAction) - new PulsarSyncDatabaseActionFactory() - .create(params) - .orElseThrow(RuntimeException::new); + return createAction(PulsarSyncDatabaseAction.class, args); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index d1e35d019411b..4767276995488 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -159,6 +159,10 @@ protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { return env; } + protected T createAction(Class clazz, List args) { + return createAction(clazz, args.toArray(new String[0])); + } + protected T createAction(Class clazz, String... args) { if (ThreadLocalRandom.current().nextBoolean()) { confuseArgs(args, "_", "-"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 0bf8d75120b4c..705826bfa6160 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -510,7 +510,7 @@ private void includingAndExcludingTablesImpl( } StreamExecutionEnvironment env = buildDefaultEnv(false); - createAction(CompactDatabaseAction.class, args.toArray(new String[0])) + createAction(CompactDatabaseAction.class, args) .withStreamExecutionEnvironment(env) .build(); env.execute(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index beb786afe3043..2710cd6d93261 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; @@ -80,22 +81,22 @@ public void testResetConsumer() throws Exception { assertThat(consumer1).isPresent(); assertThat(consumer1.get().nextSnapshot()).isEqualTo(4); + List args = + Arrays.asList( + "reset_consumer", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--consumer_id", + "myid", + "--next_snapshot", + "1"); // reset consumer if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - ResetConsumerAction.class, - "reset_consumer", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--consumer_id", - "myid", - "--next_snapshot", - "1") - .run(); + createAction(ResetConsumerAction.class, args).run(); } else { callProcedure( String.format( @@ -107,18 +108,7 @@ public void testResetConsumer() throws Exception { // delete consumer if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - ResetConsumerAction.class, - "reset_consumer", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--consumer_id", - "myid") - .run(); + createAction(ResetConsumerAction.class, args.subList(0, 9)).run(); } else { callProcedure( String.format("CALL sys.reset_consumer('%s.%s', 'myid')", database, tableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 03fc38148264b..d465fb089f103 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -761,7 +761,7 @@ public MergeIntoActionBuilder withNotMatchedInsert( } MergeIntoAction build() { - return createAction(MergeIntoAction.class, args.toArray(new String[0])); + return createAction(MergeIntoAction.class, args); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java index 2d151371aa651..2eba8261ffb6c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java @@ -67,14 +67,12 @@ public void testRunWithoutException() throws Exception { database, "--table", tableName)); - RemoveOrphanFilesAction action1 = - createAction(RemoveOrphanFilesAction.class, args.toArray(new String[0])); + RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args); assertThatCode(action1::run).doesNotThrowAnyException(); args.add("--older_than"); args.add("2023-12-31 23:59:59"); - RemoveOrphanFilesAction action2 = - createAction(RemoveOrphanFilesAction.class, args.toArray(new String[0])); + RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args); assertThatCode(action2::run).doesNotThrowAnyException(); String withoutOlderThan =