Skip to content

Commit

Permalink
Update pulsar dependency to 2.10.3
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed Apr 11, 2023
1 parent b3690ce commit bff9a40
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ public Producer<KeyValue<byte[], MutationValue>> getProducer(final TableInfo tm)
.hashingScheme(HashingScheme.Murmur3_32Hash)
.blockIfQueueFull(true)
.maxPendingMessages(config.pulsarMaxPendingMessages)
.maxPendingMessagesAcrossPartitions(config.pulsarMaxPendingMessagesAcrossPartitions)
.autoUpdatePartitions(true);

if (config.pulsarBatchDelayInMs > 0) {
Expand Down
15 changes: 4 additions & 11 deletions agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,6 @@ public static long getEnvAsLong(String varName, long defaultValue) {
1000, "CDC_PULSAR_MAX_PENDING_MESSAGES", Setting::getEnvAsInteger,
"Integer", "pulsar", 4);

public static final String PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS= "pulsarMaxPendingMessagesAcrossPartitions";
public int pulsarMaxPendingMessagesAcrossPartitions;
public static final Setting<Integer> PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING =
new Setting<>(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS, Platform.PULSAR, (c, s) -> c.pulsarMaxPendingMessagesAcrossPartitions = Integer.parseInt(s), c -> c.pulsarMaxPendingMessagesAcrossPartitions,
"The Pulsar maximum number of pending messages across partitions.",
50000, "CDC_PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS", Setting::getEnvAsInteger,
"Integer", "pulsar", 5);

public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = "pulsarAuthPluginClassName";
public String pulsarAuthPluginClassName;
public static final Setting<String> PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING =
Expand Down Expand Up @@ -349,7 +341,6 @@ public static long getEnvAsLong(String varName, long defaultValue) {
set.add(PULSAR_BATCH_BATCH_DELAY_IN_MS_SETTING);
set.add(PULSAR_KEY_BASED_BATCHER_SETTING);
set.add(PULSAR_MAX_PENDING_MESSAGES_SETTING);
set.add(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING);
set.add(PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING);
set.add(PULSAR_AUTH_PARAMS_SETTING);
settings = Collections.unmodifiableSet(set);
Expand Down Expand Up @@ -382,7 +373,6 @@ public AgentConfig() {
this.pulsarBatchDelayInMs = PULSAR_BATCH_BATCH_DELAY_IN_MS_SETTING.initDefault();
this.pulsarKeyBasedBatcher = PULSAR_KEY_BASED_BATCHER_SETTING.initDefault();
this.pulsarMaxPendingMessages = PULSAR_MAX_PENDING_MESSAGES_SETTING.initDefault();
this.pulsarMaxPendingMessagesAcrossPartitions = PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING.initDefault();
this.pulsarAuthPluginClassName = PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING.initDefault();
this.pulsarAuthParams = PULSAR_AUTH_PARAMS_SETTING.initDefault();
}
Expand Down Expand Up @@ -509,7 +499,10 @@ public AgentConfig configure(Platform platform, Map<String, Object> agentParamet
throw new IllegalArgumentException(String.format("Unsupported parameter '%s' for the %s platform ", key, platform));
}
setting.initializer.apply(this, value);
} else {
} else if ("pulsarMaxPendingMessagesAcrossPartitions".equals(key)) {
log.warn("The 'pulsarMaxPendingMessagesAcrossPartitions' parameter is deprecated, the config will be ignored");
}
else {
throw new RuntimeException(String.format("Unknown parameter '%s'", key));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testConfigurePulsar() {
PULSAR_BATCH_DELAY_IN_MS + "=20," +
PULSAR_KEY_BASED_BATCHER + "=true," +
PULSAR_MAX_PENDING_MESSAGES + "=20," +
PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS + "=200," +
"pulsarMaxPendingMessagesAcrossPartitions" + "=200," + // make sure if someone is passing the deprecated parameter, AgentConfig will not fail
PULSAR_AUTH_PLUGIN_CLASS_NAME + "=MyAuthPlugin," +
PULSAR_AUTH_PARAMS + "=x:y\\,z:t," +
SSL_ALLOW_INSECURE_CONNECTION + "=true," +
Expand All @@ -91,7 +91,6 @@ public void testConfigurePulsar() {
assertEquals(20L, config.pulsarBatchDelayInMs);
assertTrue(config.pulsarKeyBasedBatcher);
assertEquals(20, config.pulsarMaxPendingMessages);
assertEquals(200, config.pulsarMaxPendingMessagesAcrossPartitions);

// Pulsar Auth
assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName);
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cassandra4Version=4.0.4
dse4Version=6.8.23

pulsarGroup=org.apache.pulsar
pulsarVersion=2.8.3
pulsarVersion=2.10.3
testPulsarImage=datastax/lunastreaming
testPulsarImageTag=2.8.0_1.1.42

Expand Down

0 comments on commit bff9a40

Please sign in to comment.