From bff9a40f7431a485f02591177f39bd55261c62fc Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Tue, 11 Apr 2023 16:04:37 -0700 Subject: [PATCH 1/6] Update pulsar dependency to 2.10.3 --- .../cdc/agent/AbstractPulsarMutationSender.java | 1 - .../com/datastax/oss/cdc/agent/AgentConfig.java | 15 ++++----------- .../oss/cdc/agent/AgentParametersTest.java | 3 +-- gradle.properties | 2 +- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java b/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java index 7e26f7e5..776ef755 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java @@ -197,7 +197,6 @@ public Producer> getProducer(final TableInfo tm) .hashingScheme(HashingScheme.Murmur3_32Hash) .blockIfQueueFull(true) .maxPendingMessages(config.pulsarMaxPendingMessages) - .maxPendingMessagesAcrossPartitions(config.pulsarMaxPendingMessagesAcrossPartitions) .autoUpdatePartitions(true); if (config.pulsarBatchDelayInMs > 0) { diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java b/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java index 95ccb2e7..10de5b33 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java @@ -297,14 +297,6 @@ public static long getEnvAsLong(String varName, long defaultValue) { 1000, "CDC_PULSAR_MAX_PENDING_MESSAGES", Setting::getEnvAsInteger, "Integer", "pulsar", 4); - public static final String PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS= "pulsarMaxPendingMessagesAcrossPartitions"; - public int pulsarMaxPendingMessagesAcrossPartitions; - public static final Setting PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING = - new Setting<>(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS, Platform.PULSAR, (c, s) -> c.pulsarMaxPendingMessagesAcrossPartitions = Integer.parseInt(s), c -> c.pulsarMaxPendingMessagesAcrossPartitions, - "The Pulsar maximum number of pending messages across partitions.", - 50000, "CDC_PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS", Setting::getEnvAsInteger, - "Integer", "pulsar", 5); - public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = "pulsarAuthPluginClassName"; public String pulsarAuthPluginClassName; public static final Setting PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING = @@ -349,7 +341,6 @@ public static long getEnvAsLong(String varName, long defaultValue) { set.add(PULSAR_BATCH_BATCH_DELAY_IN_MS_SETTING); set.add(PULSAR_KEY_BASED_BATCHER_SETTING); set.add(PULSAR_MAX_PENDING_MESSAGES_SETTING); - set.add(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING); set.add(PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING); set.add(PULSAR_AUTH_PARAMS_SETTING); settings = Collections.unmodifiableSet(set); @@ -382,7 +373,6 @@ public AgentConfig() { this.pulsarBatchDelayInMs = PULSAR_BATCH_BATCH_DELAY_IN_MS_SETTING.initDefault(); this.pulsarKeyBasedBatcher = PULSAR_KEY_BASED_BATCHER_SETTING.initDefault(); this.pulsarMaxPendingMessages = PULSAR_MAX_PENDING_MESSAGES_SETTING.initDefault(); - this.pulsarMaxPendingMessagesAcrossPartitions = PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING.initDefault(); this.pulsarAuthPluginClassName = PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING.initDefault(); this.pulsarAuthParams = PULSAR_AUTH_PARAMS_SETTING.initDefault(); } @@ -509,7 +499,10 @@ public AgentConfig configure(Platform platform, Map agentParamet throw new IllegalArgumentException(String.format("Unsupported parameter '%s' for the %s platform ", key, platform)); } setting.initializer.apply(this, value); - } else { + } else if ("pulsarMaxPendingMessagesAcrossPartitions".equals(key)) { + log.warn("The 'pulsarMaxPendingMessagesAcrossPartitions' parameter is deprecated, the config will be ignored"); + } + else { throw new RuntimeException(String.format("Unknown parameter '%s'", key)); } } diff --git a/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java b/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java index fc36d652..71020e13 100644 --- a/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java +++ b/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java @@ -74,7 +74,7 @@ public void testConfigurePulsar() { PULSAR_BATCH_DELAY_IN_MS + "=20," + PULSAR_KEY_BASED_BATCHER + "=true," + PULSAR_MAX_PENDING_MESSAGES + "=20," + - PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS + "=200," + + "pulsarMaxPendingMessagesAcrossPartitions" + "=200," + // make sure if someone is passing the deprecated parameter, AgentConfig will not fail PULSAR_AUTH_PLUGIN_CLASS_NAME + "=MyAuthPlugin," + PULSAR_AUTH_PARAMS + "=x:y\\,z:t," + SSL_ALLOW_INSECURE_CONNECTION + "=true," + @@ -91,7 +91,6 @@ public void testConfigurePulsar() { assertEquals(20L, config.pulsarBatchDelayInMs); assertTrue(config.pulsarKeyBasedBatcher); assertEquals(20, config.pulsarMaxPendingMessages); - assertEquals(200, config.pulsarMaxPendingMessagesAcrossPartitions); // Pulsar Auth assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName); diff --git a/gradle.properties b/gradle.properties index 0b463d82..bca0e947 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ cassandra4Version=4.0.4 dse4Version=6.8.23 pulsarGroup=org.apache.pulsar -pulsarVersion=2.8.3 +pulsarVersion=2.10.3 testPulsarImage=datastax/lunastreaming testPulsarImageTag=2.8.0_1.1.42 From 85de1640f70b3f56f5b9ec15073f3be63cbcc76e Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Wed, 12 Apr 2023 10:36:31 -0700 Subject: [PATCH 2/6] Add pulsarMemoryLimitBytes --- .../oss/cdc/agent/AbstractPulsarMutationSender.java | 1 + .../java/com/datastax/oss/cdc/agent/AgentConfig.java | 10 ++++++++++ .../datastax/oss/cdc/agent/AgentParametersTest.java | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java b/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java index 776ef755..8b973429 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java @@ -92,6 +92,7 @@ public void initialize(AgentConfig config) throws PulsarClientException { try { ClientBuilder clientBuilder = PulsarClient.builder() .serviceUrl(config.pulsarServiceUrl) + .memoryLimit(config.pulsarMemoryLimitBytes, SizeUnit.BYTES) .enableTcpNoDelay(false); if (config.pulsarServiceUrl.startsWith("pulsar+ssl://")) { diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java b/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java index 10de5b33..a233a9c3 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java @@ -297,6 +297,14 @@ public static long getEnvAsLong(String varName, long defaultValue) { 1000, "CDC_PULSAR_MAX_PENDING_MESSAGES", Setting::getEnvAsInteger, "Integer", "pulsar", 4); + public static final String PULSAR_MEMORY_LIMIT_BYTES= "pulsarMemoryLimitBytes"; + public long pulsarMemoryLimitBytes; + public static final Setting PULSAR_MEMORY_LIMIT_BYTES_SETTING = + new Setting<>(PULSAR_MEMORY_LIMIT_BYTES, Platform.PULSAR, (c, s) -> c.pulsarMemoryLimitBytes = Long.parseLong(s), c -> c.pulsarMemoryLimitBytes, + "Limit of client memory usage (in bytes). The 0 default means memory limit is disabled.", + 0L, "CDC_PULSAR_MEMORY_LIMIT_BYTES", Setting::getEnvAsLong, + "Long", "pulsar", 5); + public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = "pulsarAuthPluginClassName"; public String pulsarAuthPluginClassName; public static final Setting PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING = @@ -343,6 +351,7 @@ public static long getEnvAsLong(String varName, long defaultValue) { set.add(PULSAR_MAX_PENDING_MESSAGES_SETTING); set.add(PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING); set.add(PULSAR_AUTH_PARAMS_SETTING); + set.add(PULSAR_MEMORY_LIMIT_BYTES_SETTING); settings = Collections.unmodifiableSet(set); Map> map = new HashMap<>(); @@ -375,6 +384,7 @@ public AgentConfig() { this.pulsarMaxPendingMessages = PULSAR_MAX_PENDING_MESSAGES_SETTING.initDefault(); this.pulsarAuthPluginClassName = PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING.initDefault(); this.pulsarAuthParams = PULSAR_AUTH_PARAMS_SETTING.initDefault(); + this.pulsarMemoryLimitBytes = PULSAR_MEMORY_LIMIT_BYTES_SETTING.initDefault(); } public static void main(String[] args) { diff --git a/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java b/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java index 71020e13..aad8b0d0 100644 --- a/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java +++ b/agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java @@ -71,6 +71,7 @@ void assertCommonConfig(AgentConfig config) { public void testConfigurePulsar() { String agentArgs = COMMON_CONFIG + PULSAR_SERVICE_URL + "=pulsar+ssl://mypulsar:6650\\,localhost:6651\\,localhost:6652," + + PULSAR_MEMORY_LIMIT_BYTES + "=64," + PULSAR_BATCH_DELAY_IN_MS + "=20," + PULSAR_KEY_BASED_BATCHER + "=true," + PULSAR_MAX_PENDING_MESSAGES + "=20," + @@ -91,6 +92,7 @@ public void testConfigurePulsar() { assertEquals(20L, config.pulsarBatchDelayInMs); assertTrue(config.pulsarKeyBasedBatcher); assertEquals(20, config.pulsarMaxPendingMessages); + assertEquals(64L, config.pulsarMemoryLimitBytes); // Pulsar Auth assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName); @@ -106,9 +108,11 @@ public void testConfigurePulsarFromMap() { tenantInfo.put(SSL_ALLOW_INSECURE_CONNECTION, "true"); tenantInfo.put(SSL_HOSTNAME_VERIFICATION_ENABLE, "true"); tenantInfo.put(TLS_TRUST_CERTS_FILE_PATH, "/test.p12"); + tenantInfo.put(PULSAR_MEMORY_LIMIT_BYTES, "64"); AgentConfig config = AgentConfig.create(Platform.PULSAR, tenantInfo); assertEquals("pulsar+ssl://mypulsar:6650,localhost:6651,localhost:6652", config.pulsarServiceUrl); + assertEquals(64L, config.pulsarMemoryLimitBytes); // Pulsar Auth assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName); @@ -128,6 +132,7 @@ public void testConfigurePulsarFromMap() { @SetEnvironmentVariable(key = "CDC_PULSAR_BATCH_DELAY_IN_MS", value = "555") @SetEnvironmentVariable(key = "CDC_PULSAR_KEY_BASED_BATCHER", value = "true") @SetEnvironmentVariable(key = "CDC_PULSAR_MAX_PENDING_MESSAGES", value = "555") + @SetEnvironmentVariable(key = "CDC_PULSAR_MEMORY_LIMIT_BYTES", value = "64") public void testConfigurePulsarFromEnvVar() { AgentConfig config = AgentConfig.create(Platform.PULSAR, ""); assertEquals("pulsar+ssl://mypulsar:6650,localhost:6651,localhost:6652", config.pulsarServiceUrl); @@ -138,6 +143,7 @@ public void testConfigurePulsarFromEnvVar() { assertEquals(555L, config.pulsarBatchDelayInMs); assertEquals(true, config.pulsarKeyBasedBatcher); assertEquals(555, config.pulsarMaxPendingMessages); + assertEquals(64L, config.pulsarMemoryLimitBytes); // Pulsar Auth assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName); From f73769d7adcbf42622953d6daa97fedb047eb383 Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Wed, 12 Apr 2023 15:35:43 -0700 Subject: [PATCH 3/6] Don't use the shaded jackson from test --- .../oss/pulsar/source/PulsarCassandraSourceTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 7c31fe16..5bed67eb 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -29,6 +29,8 @@ import com.datastax.testcontainers.ChaosNetworkContainer; import com.datastax.testcontainers.PulsarContainer; import com.datastax.testcontainers.cassandra.CassandraContainer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -43,8 +45,6 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.shade.com.fasterxml.jackson.databind.JsonNode; -import org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.pulsar.shade.org.apache.avro.Conversion; import org.apache.pulsar.shade.org.apache.avro.LogicalType; import org.apache.pulsar.shade.org.apache.avro.Schema; From c0885ca6cfe39da47afe4b98ba330dd24e196b79 Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Wed, 12 Apr 2023 17:45:53 -0700 Subject: [PATCH 4/6] Restore shaded jackson --- .../oss/pulsar/source/PulsarCassandraSourceTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 5bed67eb..7c31fe16 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -29,8 +29,6 @@ import com.datastax.testcontainers.ChaosNetworkContainer; import com.datastax.testcontainers.PulsarContainer; import com.datastax.testcontainers.cassandra.CassandraContainer; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -45,6 +43,8 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.pulsar.shade.org.apache.avro.Conversion; import org.apache.pulsar.shade.org.apache.avro.LogicalType; import org.apache.pulsar.shade.org.apache.avro.Schema; From a7fe305f0191c1f2b7928764fe818d1caf7ba868 Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Thu, 13 Apr 2023 18:44:18 -0700 Subject: [PATCH 5/6] Downgrade pulsar client in connector tests to 2.8.3 --- connector/gradle.properties | 4 ++++ gradle.properties | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/connector/gradle.properties b/connector/gradle.properties index 2fde427b..61684174 100644 --- a/connector/gradle.properties +++ b/connector/gradle.properties @@ -1 +1,5 @@ artifact=connector + +# TODO: remove this to fall back to the oveerall client used in the project (2.20.3) once the following issue is fixed: +# https://github.com/apache/pulsar/issues/20092 +pulsarVersion=2.8.3 diff --git a/gradle.properties b/gradle.properties index bca0e947..5023b803 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,8 +16,9 @@ dse4Version=6.8.23 pulsarGroup=org.apache.pulsar pulsarVersion=2.10.3 +# Used when running tests locally, CI will override those values testPulsarImage=datastax/lunastreaming -testPulsarImageTag=2.8.0_1.1.42 +testPulsarImageTag=2.10_3.4 kafkaVersion=3.4.0 vavrVersion=0.10.3 From a7445f66570d6bd7510fdd42dad582346b96107f Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Thu, 13 Apr 2023 21:25:25 -0700 Subject: [PATCH 6/6] Strict the pulsar version to 2.8.3 --- connector/build.gradle | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connector/build.gradle b/connector/build.gradle index f0c4be03..3b57fe8a 100644 --- a/connector/build.gradle +++ b/connector/build.gradle @@ -71,7 +71,11 @@ dependencies { testImplementation "org.testcontainers:testcontainers:${testContainersVersion}" testImplementation project(':testcontainers') - testImplementation("${pulsarGroup}:pulsar-client:${pulsarVersion}") + testImplementation("${pulsarGroup}:pulsar-client") { + version { + strictly("${pulsarVersion}") + } + } nar "${pulsarGroup}:pulsar-io:${pulsarVersion}" }