Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup ReplicationEngine:createThreadPool to avoid hardcoding Vcr #1781

Merged
merged 1 commit into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.github.ambry.store.StoreKeyConverterFactory;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.SystemTime;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -271,6 +272,11 @@ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String
return -1;
}

@Override
protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) {
return "Vcr" + super.getReplicaThreadName(datacenterToReplicateFrom, threadIndexWithinPool);
}

/**
* Check if replication is allowed from given datacenter.
* @param datacenterName datacenter name to check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,7 @@ private List<ReplicaThread> createThreadPool(String datacenter, int numberOfThre
ResponseHandler responseHandler = new ResponseHandler(clusterMap);
for (int i = 0; i < numberOfThreads; i++) {
boolean replicatingOverSsl = sslEnabledDatacenters.contains(datacenter);
String threadIdentity =
(startThread ? "Vcr" : "") + "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenter) ? "Intra-"
: "Inter-") + i + "-" + datacenter;
String threadIdentity = getReplicaThreadName(datacenter, i);
try {
StoreKeyConverter threadSpecificKeyConverter = storeKeyConverterFactory.getStoreKeyConverter();
Transformer threadSpecificTransformer =
Expand All @@ -370,6 +368,17 @@ private List<ReplicaThread> createThreadPool(String datacenter, int numberOfThre
return replicaThreads;
}

/**
* Chooses a name for a new replica thread.
* @param datacenterToReplicateFrom The datacenter that we replicate from in this thread.
* @param threadIndexWithinPool The index of the thread within the thread pool.
* @return The name of the thread.
*/
protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the only difference is the prefix, the method could be getReplicaThreadPrefix() with no arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, agreed. Though I named it like this on purpose to

  1. prevent the creation of very specialized functions
  2. allow the subclass needs to add more details to the thread name if needed
    What do you think? I'm also okay with changing it to your suggestion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change. Thanks @parasgithub for making this change. I don't disagree with your argument to allow the subclasses to add more details to the thread name if needed.
However, I have a nitpick in that case. I think other than prefix, the rest of the logic is repeated code. Can you separate them out, so that it is ensured that thread naming convention doesn't change inadvertently in subclasses, unless explicitly desired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, please take a look

return "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenterToReplicateFrom) ? "Intra-"
: "Inter-") + threadIndexWithinPool + "-" + datacenterToReplicateFrom;
}

/**
* Reads the replica tokens from storage, populates the Remote replica info,
* and re-persists the tokens if they have been reset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class CloudToStoreReplicationManagerTest {
private static final String NEW_PARTITION_NAME = "12";
private static final String CLOUD_DC_NAME = "CloudDc";
private static final String VCR_MOUNT_PATH = CLOUD_REPLICA_MOUNT + "/1";
private static final String VCR_REPLICA_THREAD_PREFIX = "VcrReplicaThread-";
private static final String REPLICA_THREAD_PREFIX = "ReplicaThread-";
private final VerifiableProperties verifiableProperties;
private final ScheduledExecutorService mockScheduler;
private final StoreKeyFactory storeKeyFactory;
Expand Down Expand Up @@ -149,7 +149,7 @@ public void cloudReplicaAdditionTest() throws Exception {
mockClusterSpectator.spectate();
// 1. test adding cloud replica that is not present locally
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME);
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX));
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX));
// create a new partition and add corresponding store in storage manager
PartitionId newPartition =
new MockPartitionId(Long.parseLong(NEW_PARTITION_NAME), MockClusterMap.DEFAULT_PARTITION_CLASS,
Expand All @@ -159,12 +159,12 @@ public void cloudReplicaAdditionTest() throws Exception {
// 2. we deliberately shut down the store to induce failure when adding cloud replica
storageManager.shutdownBlobStore(newPartition);
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME);
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX));
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX));
storageManager.startBlobStore(newPartition);
// 3. mock success case
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME);
assertNotNull("Cloud replica thread should be created for DC1",
TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX));
TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX));
cloudToStoreReplicationManager.shutdown();
storageManager.shutdown();
}
Expand Down Expand Up @@ -197,13 +197,13 @@ public void cloudReplicaRemovalTest() throws Exception {
cloudToStoreReplicationManager.getRemoteReplicaInfo(localPartition, vcrNode.getHostname(), replicaPath);
assertNotNull("Remote replica info should not be null", remoteReplicaInfo);
assertEquals("There should be only one cloud replica thread created", 1,
TestUtils.getAllThreadsByThisName(VCR_REPLICA_THREAD_PREFIX).size());
TestUtils.getAllThreadsByThisName(REPLICA_THREAD_PREFIX).size());

// 2. before removing cloud replica of local partition let's remove a non-existent partition first
mockHelixParticipant.onPartitionBecomeStandbyFromLeader(NEW_PARTITION_NAME);
// ensure there is no change in replica thread
assertEquals("There should be only one cloud replica thread created", 1,
TestUtils.getAllThreadsByThisName(VCR_REPLICA_THREAD_PREFIX).size());
TestUtils.getAllThreadsByThisName(REPLICA_THREAD_PREFIX).size());

// 3. remove the cloud replica by calling Leader-To-Standby transition on local partition
mockHelixParticipant.onPartitionBecomeStandbyFromLeader(localPartition.toPathString());
Expand Down