Skip to content

Commit

Permalink
Cleanup ReplicationEngine:createThreadPool to avoid hardcoding Vcr (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
parasgithub authored Apr 13, 2021
1 parent da4f106 commit 865b288
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.github.ambry.store.StoreKeyConverterFactory;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.SystemTime;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -271,6 +272,11 @@ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String
return -1;
}

@Override
protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) {
return "Vcr" + super.getReplicaThreadName(datacenterToReplicateFrom, threadIndexWithinPool);
}

/**
* Check if replication is allowed from given datacenter.
* @param datacenterName datacenter name to check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ private List<ReplicaThread> createThreadPool(String datacenter, int numberOfThre
ResponseHandler responseHandler = new ResponseHandler(clusterMap);
for (int i = 0; i < numberOfThreads; i++) {
boolean replicatingOverSsl = sslEnabledDatacenters.contains(datacenter);
String threadIdentity =
(startThread ? "Vcr" : "") + "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenter) ? "Intra-"
: "Inter-") + i + "-" + datacenter;
String threadIdentity = getReplicaThreadName(datacenter, i);
try {
StoreKeyConverter threadSpecificKeyConverter = storeKeyConverterFactory.getStoreKeyConverter();
Transformer threadSpecificTransformer =
Expand All @@ -372,6 +370,17 @@ private List<ReplicaThread> createThreadPool(String datacenter, int numberOfThre
return replicaThreads;
}

/**
* Chooses a name for a new replica thread.
* @param datacenterToReplicateFrom The datacenter that we replicate from in this thread.
* @param threadIndexWithinPool The index of the thread within the thread pool.
* @return The name of the thread.
*/
protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) {
return "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenterToReplicateFrom) ? "Intra-"
: "Inter-") + threadIndexWithinPool + "-" + datacenterToReplicateFrom;
}

/**
* Reads the replica tokens from storage, populates the Remote replica info,
* and re-persists the tokens if they have been reset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class CloudToStoreReplicationManagerTest {
private static final String NEW_PARTITION_NAME = "12";
private static final String CLOUD_DC_NAME = "CloudDc";
private static final String VCR_MOUNT_PATH = CLOUD_REPLICA_MOUNT + "/1";
private static final String VCR_REPLICA_THREAD_PREFIX = "VcrReplicaThread-";
private static final String REPLICA_THREAD_PREFIX = "ReplicaThread-";
private final VerifiableProperties verifiableProperties;
private final ScheduledExecutorService mockScheduler;
private final StoreKeyFactory storeKeyFactory;
Expand Down Expand Up @@ -149,7 +149,7 @@ public void cloudReplicaAdditionTest() throws Exception {
mockClusterSpectator.spectate();
// 1. test adding cloud replica that is not present locally
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME);
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX));
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX));
// create a new partition and add corresponding store in storage manager
PartitionId newPartition =
new MockPartitionId(Long.parseLong(NEW_PARTITION_NAME), MockClusterMap.DEFAULT_PARTITION_CLASS,
Expand All @@ -159,12 +159,12 @@ public void cloudReplicaAdditionTest() throws Exception {
// 2. we deliberately shut down the store to induce failure when adding cloud replica
storageManager.shutdownBlobStore(newPartition);
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME);
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX));
assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX));
storageManager.startBlobStore(newPartition);
// 3. mock success case
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME);
assertNotNull("Cloud replica thread should be created for DC1",
TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX));
TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX));
cloudToStoreReplicationManager.shutdown();
storageManager.shutdown();
}
Expand Down Expand Up @@ -197,13 +197,13 @@ public void cloudReplicaRemovalTest() throws Exception {
cloudToStoreReplicationManager.getRemoteReplicaInfo(localPartition, vcrNode.getHostname(), replicaPath);
assertNotNull("Remote replica info should not be null", remoteReplicaInfo);
assertEquals("There should be only one cloud replica thread created", 1,
TestUtils.getAllThreadsByThisName(VCR_REPLICA_THREAD_PREFIX).size());
TestUtils.getAllThreadsByThisName(REPLICA_THREAD_PREFIX).size());

// 2. before removing cloud replica of local partition let's remove a non-existent partition first
mockHelixParticipant.onPartitionBecomeStandbyFromLeader(NEW_PARTITION_NAME);
// ensure there is no change in replica thread
assertEquals("There should be only one cloud replica thread created", 1,
TestUtils.getAllThreadsByThisName(VCR_REPLICA_THREAD_PREFIX).size());
TestUtils.getAllThreadsByThisName(REPLICA_THREAD_PREFIX).size());

// 3. remove the cloud replica by calling Leader-To-Standby transition on local partition
mockHelixParticipant.onPartitionBecomeStandbyFromLeader(localPartition.toPathString());
Expand Down

0 comments on commit 865b288

Please sign in to comment.