From 44069a75d294fa0ec134449742798bc89e91a9be Mon Sep 17 00:00:00 2001 From: Paras Sud Date: Sat, 20 Feb 2021 12:51:16 -0800 Subject: [PATCH] Cleanup ReplicationEngine:createThreadPool to avoid hardcoding Vcr --- .../github/ambry/cloud/VcrReplicationManager.java | 6 ++++++ .../ambry/replication/ReplicationEngine.java | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java index 79da773511..786ff87903 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java @@ -35,6 +35,7 @@ import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.utils.SystemTime; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -271,6 +272,11 @@ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String return -1; } + @Override + protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) { + return "Vcr" + super.getReplicaThreadName(datacenterToReplicateFrom, threadIndexWithinPool); + } + /** * Check if replication is allowed from given datacenter. * @param datacenterName datacenter name to check. diff --git a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java index b86e227a12..120c56f14d 100644 --- a/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java +++ b/ambry-replication/src/main/java/com/github/ambry/replication/ReplicationEngine.java @@ -343,9 +343,7 @@ private List createThreadPool(String datacenter, int numberOfThre ResponseHandler responseHandler = new ResponseHandler(clusterMap); for (int i = 0; i < numberOfThreads; i++) { boolean replicatingOverSsl = sslEnabledDatacenters.contains(datacenter); - String threadIdentity = - (startThread ? "Vcr" : "") + "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenter) ? "Intra-" - : "Inter-") + i + "-" + datacenter; + String threadIdentity = getReplicaThreadName(datacenter, i); try { StoreKeyConverter threadSpecificKeyConverter = storeKeyConverterFactory.getStoreKeyConverter(); Transformer threadSpecificTransformer = @@ -370,6 +368,17 @@ private List createThreadPool(String datacenter, int numberOfThre return replicaThreads; } + /** + * Chooses a name for a new replica thread. + * @param datacenterToReplicateFrom The datacenter that we replicate from in this thread. + * @param threadIndexWithinPool The index of the thread within the thread pool. + * @return The name of the thread. + */ + protected String getReplicaThreadName(String datacenterToReplicateFrom, int threadIndexWithinPool) { + return "ReplicaThread-" + (dataNodeId.getDatacenterName().equals(datacenterToReplicateFrom) ? "Intra-" + : "Inter-") + threadIndexWithinPool + "-" + datacenterToReplicateFrom; + } + /** * Reads the replica tokens from storage, populates the Remote replica info, * and re-persists the tokens if they have been reset.