diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml
index e375457..f640fa2 100644
--- a/psc-flink/pom.xml
+++ b/psc-flink/pom.xml
@@ -448,12 +448,6 @@
flink-avro-confluent-registry
${flink.version}
test
-
-
- io.confluent
- kafka-schema-registry-client
-
-
org.apache.flink
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/ReducingUpsertWriter.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/ReducingUpsertWriter.java
index 89ecb0b..82d65e2 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/ReducingUpsertWriter.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/ReducingUpsertWriter.java
@@ -178,7 +178,6 @@ public long currentWatermark() {
@Override
public Long timestamp() {
- checkNotNull(timestamp, "timestamp must to be set before retrieving it.");
return timestamp;
}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/YamlFileMetadataServiceTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/YamlFileMetadataServiceTest.java
index 9917b79..e3e8cf8 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/YamlFileMetadataServiceTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/YamlFileMetadataServiceTest.java
@@ -24,6 +24,7 @@
import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.dynamic.metadata.ClusterMetadata;
import com.pinterest.flink.connector.psc.dynamic.metadata.PscStream;
+import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -45,13 +46,18 @@ public void testParseFile() throws IOException {
Properties propertiesForCluster0 = new Properties();
propertiesForCluster0.setProperty(
- PscFlinkConfiguration.CLUSTER_URI_CONFIG, "bootstrap-server-0:443");
+ PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);
+ PscTestUtils.putDiscoveryProperties(propertiesForCluster0, "bootstrap-server-0:443", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);
Properties propertiesForCluster1 = new Properties();
propertiesForCluster1.setProperty(
- PscFlinkConfiguration.CLUSTER_URI_CONFIG, "bootstrap-server-1:443");
+ PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER1_URI_PREFIX);
+ PscTestUtils.putDiscoveryProperties(propertiesForCluster1, "bootstrap-server-1:443", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER1_URI_PREFIX);
+
Properties propertiesForCluster2 = new Properties();
+ String cluster2Uri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER1_URI_PREFIX.replace("cluster1", "cluster2");
propertiesForCluster2.setProperty(
- PscFlinkConfiguration.CLUSTER_URI_CONFIG, "bootstrap-server-2:443");
+ PscFlinkConfiguration.CLUSTER_URI_CONFIG, cluster2Uri);
+ PscTestUtils.putDiscoveryProperties(propertiesForCluster2, "bootstrap-server-2:443", cluster2Uri);
assertThat(kafkaStreams)
.containsExactlyInAnyOrderElementsOf(
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/metrics/PscMetricMutableWrapperTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/metrics/PscMetricMutableWrapperTest.java
index 45d6cd7..5d1f4cc 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/metrics/PscMetricMutableWrapperTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/internals/metrics/PscMetricMutableWrapperTest.java
@@ -18,6 +18,7 @@
package com.pinterest.flink.streaming.connectors.psc.internals.metrics;
+import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationUtils;
import com.pinterest.psc.consumer.PscConsumer;
@@ -47,6 +48,7 @@
import java.util.stream.Stream;
import static com.pinterest.flink.connector.psc.testutils.DockerImageVersions.KAFKA;
+import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.putDiscoveryProperties;
import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer;
@Testcontainers
@@ -100,8 +102,11 @@ private static PscConfiguration getPscClientConfiguration() {
standardProps.put(PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
standardProps.put(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
standardProps.put(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName());
+ standardProps.put(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "psc-metric-mutable-wrapper-test");
+ standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "psc-metric-mutable-wrapper-test");
standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, "earliest");
standardProps.put(PscConfiguration.PSC_CONSUMER_PARTITION_FETCH_MAX_BYTES, 256);
+ putDiscoveryProperties(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);
return PscConfigurationUtils.propertiesToPscConfiguration(standardProps);
}
}
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/shuffle/PscShuffleITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/shuffle/PscShuffleITCase.java
index ade963d..21c1452 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/shuffle/PscShuffleITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/shuffle/PscShuffleITCase.java
@@ -368,7 +368,8 @@ private void testRecordSerDe(TimeCharacteristic timeCharacteristic) throws Excep
// Records in a single partition are kept in order
Collection> records = testPscShuffleProducer(
- topic("test_serde-" + UUID.randomUUID(), timeCharacteristic),
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX
+ + topic("test_serde-" + UUID.randomUUID(), timeCharacteristic),
env,
1,
1,
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java
index eb5690f..1cbe47d 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscChangelogTableITCase.java
@@ -171,13 +171,13 @@ public void testPscDebeziumChangelogSource() throws Exception {
*/
List expected =
Arrays.asList(
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, scooter, 3.140]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, car battery, 8.100]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, 12-pack drill bits, 0.800]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, hammer, 2.625]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, rocks, 5.100]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, jacket, 0.600]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_topic, products, spare tire, 22.200]");
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, scooter, 3.140]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, car battery, 8.100]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, 12-pack drill bits, 0.800]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, hammer, 2.625]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, rocks, 5.100]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, jacket, 0.600]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_topic, products, spare tire, 22.200]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
@@ -319,13 +319,13 @@ public void testPscCanalChangelogSource() throws Exception {
List expected =
Arrays.asList(
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
@@ -462,13 +462,13 @@ public void testPscMaxwellChangelogSource() throws Exception {
List expected =
Arrays.asList(
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
- "+I[plaintext:/rn:kafka:env:cloud_region1::cluster1:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
+ "+I[plaintext:/rn:kafka:env:cloud_region1::cluster0:changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java
index e518091..103878b 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java
@@ -524,7 +524,7 @@ public void testBoundedTimestamp() {
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
OffsetsInitializer offsetsInitializer =
PscSourceTestUtils.getStoppingOffsetsInitializer(source);
- TopicUriPartition partition = new TopicUriPartition(TOPIC, 0);
+ TopicUriPartition partition = new TopicUriPartition(TOPIC_URI, 0);
long offsetForTimestamp = 123L;
Map partitionOffsets =
offsetsInitializer.getPartitionOffsets(
@@ -538,7 +538,7 @@ public void testBoundedTimestamp() {
new HashMap<>();
result.put(
partition,
- 1L);
+ 123L);
return result;
},
partitions -> {
@@ -895,7 +895,7 @@ public void testSourceTableWithTopicAndTopicPattern() {
.satisfies(
anyCauseMatches(
ValidationException.class,
- "Option 'topic' and 'topic-pattern' shouldn't be set together."));
+ "Option 'topic-uri' and 'topic-pattern' shouldn't be set together."));
}
@Test
@@ -1125,7 +1125,7 @@ public void testDiscoverPartitionByDefault() {
new int[0],
new int[] {0, 1, 2},
null,
- Collections.singletonList(TOPIC),
+ Collections.singletonList(TOPIC_URI),
null,
props,
StartupMode.SPECIFIC_OFFSETS,
@@ -1163,7 +1163,7 @@ public void testDisableDiscoverPartition() {
new int[0],
new int[] {0, 1, 2},
null,
- Collections.singletonList(TOPIC),
+ Collections.singletonList(TOPIC_URI),
null,
props,
StartupMode.SPECIFIC_OFFSETS,
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java
index 6a088f6..fe090bc 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscTableITCase.java
@@ -202,6 +202,7 @@ public void testPscSourceSinkWithBoundedSpecificOffsets() throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+ final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic;
createTestTopic(topic, 1, 1);
// ---------- Produce an event time stream into Kafka -------------------
@@ -216,16 +217,23 @@ public void testPscSourceSinkWithBoundedSpecificOffsets() throws Exception {
+ " `behavior` STRING\n"
+ ") WITH (\n"
+ " 'connector' = '%s',\n"
- + " 'topic' = '%s',\n"
- + " 'properties.bootstrap.servers' = '%s',\n"
- + " 'properties.group.id' = '%s',\n"
+ + " 'topic-uri' = '%s',\n"
+ + " 'properties.psc.cluster.uri' = '%s',\n"
+ + " 'properties.psc.discovery.topic.uri.prefixes' = '%s',\n"
+ + " 'properties.psc.discovery.connection.urls' = '%s',\n"
+ + " 'properties.psc.discovery.security.protocols' = 'plaintext',\n"
+ + " 'properties.psc.consumer.client.id' = 'psc-test-client',\n"
+ + " 'properties.psc.producer.client.id' = 'psc-test-client',\n"
+ + " 'properties.psc.consumer.group.id' = '%s',\n"
+ " 'scan.startup.mode' = 'earliest-offset',\n"
+ " 'scan.bounded.mode' = 'specific-offsets',\n"
+ " 'scan.bounded.specific-offsets' = 'partition:0,offset:2',\n"
+ " %s\n"
+ ")\n",
PscDynamicTableFactory.IDENTIFIER,
- topic,
+ topicUri,
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX,
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX,
bootstraps,
groupId,
formatOptions());
@@ -256,6 +264,7 @@ public void testPscSourceSinkWithBoundedTimestamp() throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+ final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic;
createTestTopic(topic, 1, 1);
// ---------- Produce an event time stream into Kafka -------------------
@@ -271,16 +280,23 @@ public void testPscSourceSinkWithBoundedTimestamp() throws Exception {
+ " `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+ ") WITH (\n"
+ " 'connector' = '%s',\n"
- + " 'topic' = '%s',\n"
- + " 'properties.bootstrap.servers' = '%s',\n"
- + " 'properties.group.id' = '%s',\n"
+ + " 'topic-uri' = '%s',\n"
+ + " 'properties.psc.cluster.uri' = '%s',\n"
+ + " 'properties.psc.discovery.topic.uri.prefixes' = '%s',\n"
+ + " 'properties.psc.discovery.connection.urls' = '%s',\n"
+ + " 'properties.psc.discovery.security.protocols' = 'plaintext',\n"
+ + " 'properties.psc.consumer.client.id' = 'psc-test-client',\n"
+ + " 'properties.psc.producer.client.id' = 'psc-test-client',\n"
+ + " 'properties.psc.consumer.group.id' = '%s',\n"
+ " 'scan.startup.mode' = 'earliest-offset',\n"
+ " 'scan.bounded.mode' = 'timestamp',\n"
+ " 'scan.bounded.timestamp-millis' = '5',\n"
+ " %s\n"
+ ")\n",
PscDynamicTableFactory.IDENTIFIER,
- topic,
+ topicUri,
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX,
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX,
bootstraps,
groupId,
formatOptions());
@@ -1018,12 +1034,13 @@ public void testLatestOffsetStrategyResume() throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "latest_offset_resume_topic_" + format + "_" + UUID.randomUUID();
+ final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic;
createTestTopic(topic, 6, 1);
env.setParallelism(1);
// ---------- Produce data into Kafka's partition 0-6 -------------------
- String groupId = getStandardProps().getProperty("group.id");
+ String groupId = getStandardProps().getProperty("psc.consumer.group.id");
String bootstraps = getBootstrapServers();
final String createTable =
@@ -1032,15 +1049,27 @@ public void testLatestOffsetStrategyResume() throws Exception {
+ " `partition_id` INT,\n"
+ " `value` INT\n"
+ ") WITH (\n"
- + " 'connector' = 'kafka',\n"
- + " 'topic' = '%s',\n"
- + " 'properties.bootstrap.servers' = '%s',\n"
- + " 'properties.group.id' = '%s',\n"
+ + " 'connector' = 'psc',\n"
+ + " 'topic-uri' = '%s',\n"
+ + " 'properties.psc.cluster.uri' = '%s',\n"
+ + " 'properties.psc.discovery.topic.uri.prefixes' = '%s',\n"
+ + " 'properties.psc.discovery.connection.urls' = '%s',\n"
+ + " 'properties.psc.discovery.security.protocols' = 'plaintext',\n"
+ + " 'properties.psc.consumer.client.id' = 'psc-test-client',\n"
+ + " 'properties.psc.producer.client.id' = 'psc-test-client',\n"
+ + " 'properties.psc.consumer.group.id' = '%s',\n"
+ " 'scan.startup.mode' = 'latest-offset',\n"
+ " 'sink.partitioner' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
- topic, bootstraps, groupId, TestPartitioner.class.getName(), format);
+ topicUri,
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX,
+ PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX,
+ bootstraps,
+ groupId,
+ TestPartitioner.class.getName(),
+ format
+ );
tEnv.executeSql(createTable);
diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java
index 07a3c32..53d7fd2 100644
--- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java
+++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/UpsertPscDynamicTableFactoryTest.java
@@ -185,7 +185,7 @@ public void testTableSink() {
getFullSinkOptions(),
options -> {
options.put("sink.delivery-guarantee", "exactly-once");
- options.put("sink.transactional-id-prefix", "kafka-sink");
+ options.put("sink.transactional-id-prefix", "psc-sink");
});
final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, modifiedOptions);
@@ -486,7 +486,7 @@ public void testBoundedGroupOffsets() {
testBoundedOffsets(
PscConnectorOptions.ScanBoundedMode.GROUP_OFFSETS,
options -> {
- options.put("properties.group.id", "dummy");
+ options.put("properties.psc.consumer.group.id", "dummy");
},
source -> {
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
@@ -526,7 +526,7 @@ public void testBoundedTimestamp() {
.containsEntry(partition, 1L);
Map result =
new HashMap<>();
- result.put(partition, 1L);
+ result.put(partition, 123L);
return result;
},
partitions -> {
diff --git a/psc-flink/src/test/resources/stream-metadata.yaml b/psc-flink/src/test/resources/stream-metadata.yaml
new file mode 100644
index 0000000..cf78b4e
--- /dev/null
+++ b/psc-flink/src/test/resources/stream-metadata.yaml
@@ -0,0 +1,22 @@
+- streamId: stream0
+ clusterMetadataList:
+ - clusterId: cluster0
+ bootstrapServers: bootstrap-server-0:443
+ clusterUriString: "plaintext:/rn:kafka:env:cloud_region1::cluster0:"
+ topics:
+ - topic0
+ - topic1
+ - clusterId: cluster1
+ bootstrapServers: bootstrap-server-1:443
+ clusterUriString: "plaintext:/rn:kafka:env:cloud_region1::cluster1:"
+ topics:
+ - topic2
+ - topic3
+- streamId: stream1
+ clusterMetadataList:
+ - clusterId: cluster2
+ bootstrapServers: bootstrap-server-2:443
+ clusterUriString: "plaintext:/rn:kafka:env:cloud_region1::cluster2:"
+ topics:
+ - topic4
+ - topic5