Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new endpoint to return resource info #2676

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class Operations {
public static final String UPDATE_TTL = "updateTtl";
public static final String STITCH = "stitch";
public static final String GET_CLUSTER_MAP_SNAPSHOT = "getClusterMapSnapshot";
public static final String RESOURCE_INFO = "resourceInfo";
public static final String STATS_REPORT = "statsReport";
public static final String ACCOUNTS = "accounts";
public static final String ACCOUNTS_CONTAINERS = "accounts/containers";
Expand Down
10 changes: 10 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,16 @@ public static final class Headers {
* Request header to carry {@link StatsReportType} for GetStatsReport request.
*/
public final static String GET_STATS_REPORT_TYPE = "x-ambry-stats-type";

/**
* Request header to carry partition name.
*/
public final static String PARTITION = "x-ambry-partition";

/**
* Request header to carry resource name;
*/
public final static String RESOURCE = "x-ambry-resource";
}

public static final class TrackingHeaders {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class HelixClusterManager implements ClusterMap {
private final ConcurrentHashMap<AmbryDataNode, ConcurrentHashMap<String, AmbryReplica>> ambryDataNodeToAmbryReplicas =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<AmbryDataNode, Set<AmbryDisk>> ambryDataNodeToAmbryDisks = new ConcurrentHashMap<>();
private final Map<String, Map<String, Set<String>>> resourceNameToPartitionByDc = new ConcurrentHashMap<>();
private final Map<String, ConcurrentHashMap<String, Set<String>>> partitionToResourceNameByDc =
new ConcurrentHashMap<>();
private final Map<String, ConcurrentHashMap<String, String>> partitionToDuplicateResourceNameByDc =
Expand Down Expand Up @@ -1037,7 +1038,16 @@ int getTotalInstanceCount(String resource) {

int getLiveInstanceCount(String resource) {
return (int) (getAllInstancesForResource(resource).stream()
.map(instanceNameToAmbryDataNode::get).filter(dn -> dn.getState() == HardwareState.AVAILABLE).count());
.map(instanceNameToAmbryDataNode::get)
.filter(dn -> dn.getState() == HardwareState.AVAILABLE)
.count());
}

int getUnavailableInstanceCount(String resource) {
return (int) (getAllInstancesForResource(resource).stream()
.map(instanceNameToAmbryDataNode::get)
.filter(dn -> dn.getState() == HardwareState.UNAVAILABLE)
.count());
}

long getResourceTotalRegisteredHostDiskCapacity(String resource) {
Expand All @@ -1064,81 +1074,83 @@ int getNumberOfPartitionsInResource(String resource) {
}

int getReplicaCountForStateInResource(ReplicaState state, String resource) {
Collection<ExternalView> externalViews;
ExternalView externalView;
if (clusterMapConfig.clusterMapUseAggregatedView) {
externalViews = globalRoutingTableSnapshotRef.get().getExternalViews();
externalView = globalResourceToExternalView.get(resource);
} else {
externalViews =
dcToRoutingTableSnapshotRef.get(clusterMapConfig.clusterMapDatacenterName).get().getExternalViews();
externalView = dcToResourceToExternalView.get(clusterMapConfig.clusterMapDatacenterName).get(resource);
}
return getReplicaCountForStateInExternalView(state, externalViews, resource,
clusterMapConfig.clusterMapUseAggregatedView);
}

int getReplicaCountForStateInExternalView(ReplicaState state, Collection<ExternalView> externalViews, String resource,
boolean checkDatacenter) {
if (externalViews == null || externalViews.isEmpty()) {
if (externalView == null) {
return 0;
}
return externalViews.stream().filter(ev -> ev.getResourceName().equals(resource)).findFirst().map(ev -> {
int result = 0;
for (String partition : ev.getPartitionSet()) {
Map<String, String> states = ev.getStateMap(partition);
if (!states.isEmpty()) {
result += states.entrySet().stream().filter(ent -> {
if (!ent.getValue().equals(state.name())) {
return false;
}
if (checkDatacenter) {
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(ent.getKey());
return dataNode != null && dataNode.getDatacenterName().equals(clusterMapConfig.clusterMapDatacenterName);
}
return true;
}).count();
}
return getReplicaCountForStateInExternalView(state, externalView, clusterMapConfig.clusterMapUseAggregatedView);
}

int getReplicaCountForStateInExternalView(ReplicaState state, ExternalView externalView, boolean checkDatacenter) {
int result = 0;
for (String partition : externalView.getPartitionSet()) {
Map<String, String> states = externalView.getStateMap(partition);
if (!states.isEmpty()) {
result += states.entrySet().stream().filter(ent -> {
if (state != null && !ent.getValue().equals(state.name())) { // if state is null, then include all states
return false;
}
if (checkDatacenter) {
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(ent.getKey());
return dataNode != null && dataNode.getDatacenterName().equals(clusterMapConfig.clusterMapDatacenterName);
}
return true;
}).count();
}
return result;
}).orElse(0);
}
return result;
}

int getPartitionDiskWeight(String partition) {
return diskWeightForPartitions.getOrDefault(partition, getPartitionDefaultDiskWeight());
}

int getResourceExpectedTotalDiskCapacityUsage(String resource, int replicationFactor) {
String dcName = clusterMapConfig.clusterMapDatacenterName;
getResourceConfig(resource, dcName);
int result = 0;
for (String partition : resourceNameToPartitionByDc.get(dcName).get(resource)) {
result += getPartitionDiskWeight(partition) * replicationFactor;
}
return result;
}

int getResourceTotalDiskCapacityUsage(String resource) {
// call this method to fill up the partition capacity map from resource config
getResourceConfig(resource, clusterMapConfig.clusterMapDatacenterName);
Collection<ExternalView> externalViews;
String dcName = clusterMapConfig.clusterMapDatacenterName;
getResourceConfig(resource, dcName);
ExternalView externalView;
if (clusterMapConfig.clusterMapUseAggregatedView) {
externalViews = globalRoutingTableSnapshotRef.get().getExternalViews();
externalView = globalResourceToExternalView.get(resource);
} else {
externalViews =
dcToRoutingTableSnapshotRef.get(clusterMapConfig.clusterMapDatacenterName).get().getExternalViews();
externalView = dcToResourceToExternalView.get(dcName).get(resource);
}
if (externalViews == null || externalViews.isEmpty()) {
if (externalView == null) {
return 0;
}
return externalViews.stream().filter(ev -> ev.getResourceName().equals(resource)).findFirst().map(ev -> {
int result = 0;
for (String partition : ev.getPartitionSet()) {
Map<String, String> states = ev.getStateMap(partition);

int weight = getPartitionDiskWeight(partition);
if (!states.isEmpty()) {
int numReplica;
if (clusterMapConfig.clusterMapUseAggregatedView) {
numReplica = (int) states.keySet().stream().filter(instance -> {
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instance);
return dataNode != null && dataNode.getDatacenterName().equals(clusterMapConfig.clusterMapDatacenterName);
}).count();
} else {
numReplica = states.size();
}
result += numReplica * weight;
int result = 0;
for (String partition : externalView.getPartitionSet()) {
Map<String, String> states = externalView.getStateMap(partition);
int weight = getPartitionDiskWeight(partition);
if (!states.isEmpty()) {
int numReplica;
if (clusterMapConfig.clusterMapUseAggregatedView) {
numReplica = (int) states.keySet().stream().filter(instance -> {
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instance);
return dataNode != null && dataNode.getDatacenterName().equals(clusterMapConfig.clusterMapDatacenterName);
}).count();
} else {
numReplica = states.size();
}
result += numReplica * weight;
}
return result;
}).orElse(0);
}
return result;
}

int getRegisteredHostDiskCapacity() {
Expand Down Expand Up @@ -1772,6 +1784,7 @@ private void updatePartitionResourceMappingFromIdealStates(Collection<IdealState
if (dcName != null) {
// Rebuild the entire partition-to-resource map in current dc
ConcurrentHashMap<String, Set<String>> partitionToResourceMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Set<String>> resourceToPartitionMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, String> partitionToDuplicateResourceMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Set<String>> partitionResourceToInstanceSetMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, ResourceProperty> tagToProperty = new ConcurrentHashMap<>();
Expand All @@ -1786,6 +1799,7 @@ private void updatePartitionResourceMappingFromIdealStates(Collection<IdealState
new ResourceProperty(resourceName, numPartitions, state.getRebalanceMode());
tagToProperty.put(tag, resourceProperty);
}
resourceToPartitionMap.put(resourceName, new HashSet<>(state.getPartitionSet()));
// Build the partition to resource, partition to duplicate resource and partition-resource to instance set map
for (String partitionName : state.getPartitionSet()) {
Set<String> instanceSet = state.getInstanceSet(partitionName);
Expand Down Expand Up @@ -1827,6 +1841,7 @@ private void updatePartitionResourceMappingFromIdealStates(Collection<IdealState
dcToResourceNameToTag.put(dcName, resourceNameToTag);
partitionToResourceNameByDc.put(dcName, partitionToResourceMap);
partitionToDuplicateResourceNameByDc.put(dcName, partitionToDuplicateResourceMap);
resourceNameToPartitionByDc.put(dcName, resourceToPartitionMap);

// Ideal state has changed, we need to check if the data node is now on FULL AUTO or not.
// This call might come from frontend node, make sure we don't register any FULL AUTO metrics in frontend.
Expand Down Expand Up @@ -2162,4 +2177,134 @@ static class ResourceProperty {
this.rebalanceMode = rebalanceMode;
}
}

/**
* Interfaces and classes for returning the resource information to client.
*/

/**
* An identifier interface to return a list of resource names.
*/
public interface ResourceIdentifier {
List<String> identifyResources(HelixClusterManager helixClusterManager);
}

/**
* An implementation of {@link ResourceIdentifier} to return a single resource name based
* on the partition name.
*/
public static class PartitionIdIdentifier implements ResourceIdentifier {
private String partitionName;

/**
* Constructor to create a {@link PartitionIdIdentifier}.
* @param partitionName
*/
public PartitionIdIdentifier(String partitionName) {
this.partitionName = partitionName;
}

/**
* Return the resource name that the given partition belongs to.
* @param helixClusterManager
* @return
*/
@Override
public List<String> identifyResources(HelixClusterManager helixClusterManager) {
Map<String, Set<String>> partitionToResourceMap =
helixClusterManager.partitionToResourceNameByDc.get(helixClusterManager.clusterMapConfig);
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
if (!partitionToResourceMap.containsKey(partitionName)) {
throw new IllegalArgumentException("Partition " + partitionName + " doesn't exist");
}
return new ArrayList<>(partitionToResourceMap.get(partitionName));
}
}

/**
* An implementation of {@link ResourceIdentifier} to return the resource name given by the client.
*/
public static class ResourceNameIdentifier implements ResourceIdentifier {
private String resourceName;

/**
* Constructor to create a {@link ResourceNameIdentifier}.
* @param resourceName
*/
public ResourceNameIdentifier(String resourceName) {
this.resourceName = resourceName;
}

/**
* Check if the given resource is valid, and then return it back.
* @param helixClusterManager
* @return
*/
@Override
public List<String> identifyResources(HelixClusterManager helixClusterManager) {
if (!helixClusterManager.dcToResourceNameToTag.get(helixClusterManager.clusterMapConfig).contains(resourceName)) {
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("Resource " + resourceName + " doesn't exist");
}
return Collections.singletonList(resourceName);
}
}

/**
* An implementation to return all the resource this cluster map has.
*/
public static class AllResourceIdentifier implements ResourceIdentifier {
public static final AllResourceIdentifier DEFAULT = new AllResourceIdentifier();

/**
* Return all the resource names this cluster map has.
* @param helixClusterManager
* @return
*/
@Override
public List<String> identifyResources(HelixClusterManager helixClusterManager) {
return new ArrayList<>(
helixClusterManager.dcToResourceNameToTag.get(helixClusterManager.clusterMapConfig).keySet());
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Query the list of the resource names identified by the {@link ResourceIdentifier} and return a list of {@link ResourceInfo}.
* @param identifier
* @return
*/
public List<ResourceInfo> queryResourceInfos(ResourceIdentifier identifier) {
List<String> resourceNames = identifier.identifyResources(this);
return resourceNames.stream().map(this::getResourceInfo).collect(Collectors.toList());
}

private ResourceInfo getResourceInfo(String resourceName) {
String dcName = clusterMapConfig.clusterMapDatacenterName;
List<String> liveInstances = new ArrayList<>(getLiveInstanceCount(resourceName));
List<String> unavailableInstances = new ArrayList<>(getUnavailableInstanceCount(resourceName));
long totalCapacity = getResourceTotalRegisteredHostDiskCapacity(resourceName);
long liveCapacity = getResourceAvailableRegisteredHostDiskCapacity(resourceName);
long unavailableCapacity = totalCapacity - liveCapacity;
int numPartitions = getNumberOfPartitionsInResource(resourceName);
int replicationFactor = 3; // by default it is 3;
String numReplicaStr = getResourceConfig(resourceName, dcName).getNumReplica();
if (numReplicaStr != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may hit exception if the string is not a number.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we will get an exception and return 503 back to client

replicationFactor = Integer.parseInt(numReplicaStr);
}
int numExpectedReplicas = numPartitions * replicationFactor;
int numCurrentReplicas = getReplicaCountForStateInResource(null, resourceName);
int expectedTotalReplicaWeight = getResourceExpectedTotalDiskCapacityUsage(resourceName, replicationFactor);
int currentTotalReplicaWeight = getResourceTotalDiskCapacityUsage(resourceName);
Map<String, Set<String>> failedDisks = new HashMap<>();
for (String instanceName : getAllInstancesForResource(resourceName)) {
Set<AmbryDisk> disks = ambryDataNodeToAmbryDisks.get(instanceNameToAmbryDataNode.get(instanceName));
Set<String> failedDiskMountPath = disks.stream()
.filter(disk -> disk.getState() == HardwareState.UNAVAILABLE)
.map(AmbryDisk::getMountPath)
.collect(Collectors.toSet());
if (!failedDiskMountPath.isEmpty()) {
failedDisks.put(instanceName, failedDiskMountPath);
}
}
return new ResourceInfo(liveInstances, unavailableInstances, liveCapacity, unavailableCapacity, numPartitions,
numExpectedReplicas, numCurrentReplicas, expectedTotalReplicaWeight, currentTotalReplicaWeight, failedDisks);
}
}
Loading
Loading