Skip to content

Commit

Permalink
improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 1, 2023
1 parent 2697495 commit e4148c0
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ public KafkaSyncTableAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList(
"kafka_sync_table",
"--warehouse",
warehouse,
"--database",
Expand All @@ -265,14 +266,7 @@ public KafkaSyncTableAction build() {

args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (KafkaSyncTableAction)
new KafkaSyncTableActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(KafkaSyncTableAction.class, args);
}
}

Expand All @@ -299,7 +293,12 @@ public KafkaSyncDatabaseActionBuilder withMode(String mode) {
public KafkaSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList("--warehouse", warehouse, "--database", database));
Arrays.asList(
"kafka_sync_database",
"--warehouse",
warehouse,
"--database",
database));

args.addAll(mapToArgs("--kafka-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
Expand All @@ -316,10 +315,7 @@ public KafkaSyncDatabaseAction build() {
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (KafkaSyncDatabaseAction)
new KafkaSyncDatabaseActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(KafkaSyncDatabaseAction.class, args);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.paimon.flink.action.cdc.mongodb;

import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,6 +110,7 @@ public MongoDBSyncTableAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList(
"mongodb_sync_table",
"--warehouse",
warehouse,
"--database",
Expand All @@ -126,14 +125,7 @@ public MongoDBSyncTableAction build() {

args.addAll(listToArgs("--partition-keys", partitionKeys));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (MongoDBSyncTableAction)
new MongoDBSyncTableActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(MongoDBSyncTableAction.class, args);
}
}

Expand Down Expand Up @@ -164,7 +156,12 @@ public MongoDBSyncDatabaseActionBuilder withTypeMappingModes(String... typeMappi
public MongoDBSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList("--warehouse", warehouse, "--database", database));
Arrays.asList(
"mongodb_sync_database",
"--warehouse",
warehouse,
"--database",
database));

args.addAll(mapToArgs("--mongodb-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
Expand All @@ -175,14 +172,7 @@ public MongoDBSyncDatabaseAction build() {
args.addAll(nullableToArgs("--including-tables", includingTables));
args.addAll(nullableToArgs("--excluding-tables", excludingTables));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (MongoDBSyncDatabaseAction)
new MongoDBSyncDatabaseActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(MongoDBSyncDatabaseAction.class, args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,6 +111,7 @@ public MySqlSyncTableAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList(
"mysql_sync_table",
"--warehouse",
warehouse,
"--database",
Expand All @@ -131,14 +130,7 @@ public MySqlSyncTableAction build() {
args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
args.addAll(listToMultiArgs("--metadata-column", metadataColumns));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (MySqlSyncTableAction)
new MySqlSyncTableActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(MySqlSyncTableAction.class, args);
}
}

Expand All @@ -153,7 +145,12 @@ public MySqlSyncDatabaseActionBuilder(Map<String, String> mySqlConfig) {
public MySqlSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList("--warehouse", warehouse, "--database", database));
Arrays.asList(
"mysql_sync_database",
"--warehouse",
warehouse,
"--database",
database));

args.addAll(mapToArgs("--mysql-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
Expand All @@ -170,14 +167,7 @@ public MySqlSyncDatabaseAction build() {
args.addAll(listToArgs("--type-mapping", typeMappingModes));
args.addAll(listToArgs("--metadata-column", metadataColumn));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (MySqlSyncDatabaseAction)
new MySqlSyncDatabaseActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(MySqlSyncDatabaseAction.class, args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

package org.apache.paimon.flink.action.cdc.pulsar;

import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -278,6 +276,7 @@ public PulsarSyncTableAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList(
"pulsar_sync_table",
"--warehouse",
warehouse,
"--database",
Expand All @@ -295,14 +294,7 @@ public PulsarSyncTableAction build() {

args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (PulsarSyncTableAction)
new PulsarSyncTableActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(PulsarSyncTableAction.class, args);
}
}

Expand All @@ -329,7 +321,12 @@ public PulsarSyncDatabaseActionBuilder withMode(String mode) {
public PulsarSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
Arrays.asList("--warehouse", warehouse, "--database", database));
Arrays.asList(
"pulsar_sync-database",
"--warehouse",
warehouse,
"--database",
database));

args.addAll(mapToArgs("--pulsar-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
Expand All @@ -342,14 +339,7 @@ public PulsarSyncDatabaseAction build() {

args.addAll(listToArgs("--type-mapping", typeMappingModes));

MultipleParameterToolAdapter params =
new MultipleParameterToolAdapter(
MultipleParameterTool.fromArgs(
args.toArray(args.toArray(new String[0]))));
return (PulsarSyncDatabaseAction)
new PulsarSyncDatabaseActionFactory()
.create(params)
.orElseThrow(RuntimeException::new);
return createAction(PulsarSyncDatabaseAction.class, args);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
return env;
}

protected <T extends ActionBase> T createAction(Class<T> clazz, List<String> args) {
return createAction(clazz, args.toArray(new String[0]));
}

protected <T extends ActionBase> T createAction(Class<T> clazz, String... args) {
if (ThreadLocalRandom.current().nextBoolean()) {
confuseArgs(args, "_", "-");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ private void includingAndExcludingTablesImpl(
}

StreamExecutionEnvironment env = buildDefaultEnv(false);
createAction(CompactDatabaseAction.class, args.toArray(new String[0]))
createAction(CompactDatabaseAction.class, args)
.withStreamExecutionEnvironment(env)
.build();
env.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -80,22 +81,22 @@ public void testResetConsumer() throws Exception {
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);

List<String> args =
Arrays.asList(
"reset_consumer",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--consumer_id",
"myid",
"--next_snapshot",
"1");
// reset consumer
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
ResetConsumerAction.class,
"reset_consumer",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--consumer_id",
"myid",
"--next_snapshot",
"1")
.run();
createAction(ResetConsumerAction.class, args).run();
} else {
callProcedure(
String.format(
Expand All @@ -107,18 +108,7 @@ public void testResetConsumer() throws Exception {

// delete consumer
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
ResetConsumerAction.class,
"reset_consumer",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--consumer_id",
"myid")
.run();
createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
} else {
callProcedure(
String.format("CALL sys.reset_consumer('%s.%s', 'myid')", database, tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ public MergeIntoActionBuilder withNotMatchedInsert(
}

MergeIntoAction build() {
return createAction(MergeIntoAction.class, args.toArray(new String[0]));
return createAction(MergeIntoAction.class, args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ public void testRunWithoutException() throws Exception {
database,
"--table",
tableName));
RemoveOrphanFilesAction action1 =
createAction(RemoveOrphanFilesAction.class, args.toArray(new String[0]));
RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args);
assertThatCode(action1::run).doesNotThrowAnyException();

args.add("--older_than");
args.add("2023-12-31 23:59:59");
RemoveOrphanFilesAction action2 =
createAction(RemoveOrphanFilesAction.class, args.toArray(new String[0]));
RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args);
assertThatCode(action2::run).doesNotThrowAnyException();

String withoutOlderThan =
Expand Down

0 comments on commit e4148c0

Please sign in to comment.