diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java index 79da773511..786ff87903 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java @@ -35,6 +35,7 @@ import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.utils.SystemTime; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -271,6 +272,11 @@ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String return -1; } + @Override + protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) { + return "Vcr" + super.getReplicaThreadName(datacenterToReplicateFrom, threadIndexWithinPool); + } + /** * Check if replication is allowed from given datacenter. * @param datacenterName datacenter name to check. diff --git a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java index b86e227a12..120c56f14d 100644 --- a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java +++ b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java @@ -343,9 +343,7 @@ private List createThreadPool(String datacenter, int numberOfThre ResponseHandler responseHandler = new ResponseHandler(clusterMap); for (int i = 0; i < numberOfThreads; i++) { boolean replicatingOverSsl = sslEnabledDatacenters.contains(datacenter); - String threadIdentity = - (startThread ? "Vcr" : "") + "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenter) ? "Intra-" - : "Inter-") + i + "-" + datacenter; + String threadIdentity = getReplicaThreadName(datacenter, i); try { StoreKeyConverter threadSpecificKeyConverter = storeKeyConverterFactory.getStoreKeyConverter(); Transformer threadSpecificTransformer = @@ -370,6 +368,17 @@ private List createThreadPool(String datacenter, int numberOfThre return replicaThreads; } + /** + * Chooses a name for a new replica thread. + * @param datacenterToReplicateFrom The datacenter that we replicate from in this thread. + * @param threadIndexWithinPool The index of the thread within the thread pool. + * @return The name of the thread. + */ + protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) { + return "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenterToReplicateFrom) ? "Intra-" + : "Inter-") + threadIndexWithinPool + "-" + datacenterToReplicateFrom; + } + /** * Reads the replica tokens from storage, populates the Remote replica info, * and re-persists the tokens if they have been reset. diff --git a/ambry-replication/src/test/java/com/github/ambry/replication/CloudToStoreReplicationManagerTest.java b/ambry-replication/src/test/java/com/github/ambry/replication/CloudToStoreReplicationManagerTest.java index d359b64478..34c24d1296 100644 --- a/ambry-replication/src/test/java/com/github/ambry/replication/CloudToStoreReplicationManagerTest.java +++ b/ambry-replication/src/test/java/com/github/ambry/replication/CloudToStoreReplicationManagerTest.java @@ -70,7 +70,7 @@ public class CloudToStoreReplicationManagerTest { private static final String NEW_PARTITION_NAME = "12"; private static final String CLOUD_DC_NAME = "CloudDc"; private static final String VCR_MOUNT_PATH = CLOUD_REPLICA_MOUNT + "/1"; - private static final String VCR_REPLICA_THREAD_PREFIX = "VcrReplicaThread-"; + private static final String REPLICA_THREAD_PREFIX = "ReplicaThread-"; private final VerifiableProperties verifiableProperties; private final ScheduledExecutorService mockScheduler; private final StoreKeyFactory storeKeyFactory; @@ -149,7 +149,7 @@ public void cloudReplicaAdditionTest() throws Exception { mockClusterSpectator.spectate(); // 1. test adding cloud replica that is not present locally mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME); - assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX)); + assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX)); // create a new partition and add corresponding store in storage manager PartitionId newPartition = new MockPartitionId(Long.parseLong(NEW_PARTITION_NAME), MockClusterMap.DEFAULT_PARTITION_CLASS, @@ -159,12 +159,12 @@ public void cloudReplicaAdditionTest() throws Exception { // 2. we deliberately shut down the store to induce failure when adding cloud replica storageManager.shutdownBlobStore(newPartition); mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME); - assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX)); + assertNull("Cloud replica thread should not be created", TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX)); storageManager.startBlobStore(newPartition); // 3. mock success case mockHelixParticipant.onPartitionBecomeLeaderFromStandby(NEW_PARTITION_NAME); assertNotNull("Cloud replica thread should be created for DC1", - TestUtils.getThreadByThisName(VCR_REPLICA_THREAD_PREFIX)); + TestUtils.getThreadByThisName(REPLICA_THREAD_PREFIX)); cloudToStoreReplicationManager.shutdown(); storageManager.shutdown(); } @@ -197,13 +197,13 @@ public void cloudReplicaRemovalTest() throws Exception { cloudToStoreReplicationManager.getRemoteReplicaInfo(localPartition, vcrNode.getHostname(), replicaPath); assertNotNull("Remote replica info should not be null", remoteReplicaInfo); assertEquals("There should be only one cloud replica thread created", 1, - TestUtils.getAllThreadsByThisName(VCR_REPLICA_THREAD_PREFIX).size()); + TestUtils.getAllThreadsByThisName(REPLICA_THREAD_PREFIX).size()); // 2. before removing cloud replica of local partition let's remove a non-existent partition first mockHelixParticipant.onPartitionBecomeStandbyFromLeader(NEW_PARTITION_NAME); // ensure there is no change in replica thread assertEquals("There should be only one cloud replica thread created", 1, - TestUtils.getAllThreadsByThisName(VCR_REPLICA_THREAD_PREFIX).size()); + TestUtils.getAllThreadsByThisName(REPLICA_THREAD_PREFIX).size()); // 3. remove the cloud replica by calling Leader-To-Standby transition on local partition mockHelixParticipant.onPartitionBecomeStandbyFromLeader(localPartition.toPathString());