Skip to content

Commit

Permalink
[fix][broker] Duplicate LedgerOffloader creation when namespace/topic… (
Browse files Browse the repository at this point in the history
apache#21591)

(cherry picked from commit 98bf9dd)
  • Loading branch information
shibd committed Nov 22, 2023
1 parent d0cb05b commit ebe32ab
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import lombok.Data;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public static final Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
public static final String EXTRA_CONFIG_PREFIX = "managedLedgerOffloadExtraConfig";

public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
Expand Down Expand Up @@ -99,7 +101,6 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;

// s3 config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
Expand Down Expand Up @@ -222,8 +223,7 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu

public static OffloadPoliciesImpl create(Properties properties) {
OffloadPoliciesImpl data = new OffloadPoliciesImpl();
Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields();
Arrays.stream(fields).forEach(f -> {
for (Field f : CONFIGURATION_FIELDS) {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
Expand All @@ -234,7 +234,7 @@ public static OffloadPoliciesImpl create(Properties properties) {
f.getName(), properties.get(f.getName())), e);
}
}
});
}
data.compatibleWithBrokerConfigFile(properties);
return data;
}
Expand Down Expand Up @@ -311,62 +311,14 @@ public boolean bucketValid() {

public Properties toProperties() {
Properties properties = new Properties();
setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority());
setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory());
setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver());
setProperty(properties, "managedLedgerOffloadMaxThreads",
this.getManagedLedgerOffloadMaxThreads());
setProperty(properties, "managedLedgerOffloadPrefetchRounds",
this.getManagedLedgerOffloadPrefetchRounds());
setProperty(properties, "managedLedgerOffloadThresholdInBytes",
this.getManagedLedgerOffloadThresholdInBytes());
setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
this.getManagedLedgerOffloadDeletionLagInMillis());

if (this.isS3Driver()) {
setProperty(properties, "s3ManagedLedgerOffloadRegion",
this.getS3ManagedLedgerOffloadRegion());
setProperty(properties, "s3ManagedLedgerOffloadBucket",
this.getS3ManagedLedgerOffloadBucket());
setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
this.getS3ManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
this.getS3ManagedLedgerOffloadCredentialId());
setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
this.getS3ManagedLedgerOffloadCredentialSecret());
setProperty(properties, "s3ManagedLedgerOffloadRole",
this.getS3ManagedLedgerOffloadRole());
setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
this.getS3ManagedLedgerOffloadRoleSessionName());
setProperty(properties, "s3ManagedLedgerOffloadReadBufferSizeInBytes",
this.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
} else if (this.isGcsDriver()) {
setProperty(properties, "gcsManagedLedgerOffloadRegion",
this.getGcsManagedLedgerOffloadRegion());
setProperty(properties, "gcsManagedLedgerOffloadBucket",
this.getGcsManagedLedgerOffloadBucket());
setProperty(properties, "gcsManagedLedgerOffloadMaxBlockSizeInBytes",
this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadReadBufferSizeInBytes",
this.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
setProperty(properties, "gcsManagedLedgerOffloadServiceAccountKeyFile",
this.getGcsManagedLedgerOffloadServiceAccountKeyFile());
} else if (this.isFileSystemDriver()) {
setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath());
setProperty(properties, "fileSystemURI", this.getFileSystemURI());
}

setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket());
setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion());
setProperty(properties, "managedLedgerOffloadServiceEndpoint",
this.getManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
this.getManagedLedgerOffloadMaxBlockSizeInBytes());
setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
this.getManagedLedgerOffloadReadBufferSizeInBytes());

for (Field f : CONFIGURATION_FIELDS) {
try {
f.setAccessible(true);
setProperty(properties, f.getName(), f.get(this));
} catch (Exception e) {
throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e);
}
}
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.policies.data;

import static org.testng.Assert.assertEquals;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -305,4 +306,20 @@ public void mergeTest() {
Assert.assertNull(offloadPolicies.getS3ManagedLedgerOffloadRegion());
}

/**
* Test toProperties as well as create from properties.
* @throws Exception
*/
@Test
public void testToProperties() throws Exception {
// Base information convert.
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST);
assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));

// Set useless config to offload policies. Make sure convert conversion result is the same.
offloadPolicies.setFileSystemProfilePath("/test/file");
assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
}
}

0 comments on commit ebe32ab

Please sign in to comment.