Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
HDFS-15707. NNTop counts don't add up as expected. (apache#2516) Cont…
Browse files Browse the repository at this point in the history
…ributed by Ahmed Hussein and Daryn Sharp

(cherry picked from commit 6a5864e)
  • Loading branch information
jbrennan333 authored and Hexiaoqiao committed Dec 8, 2020
1 parent 39b4f9d commit c43ac3c
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
Expand Down Expand Up @@ -137,8 +136,6 @@ public void report(long currTime, String userName, String cmd) {
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
.values()) {
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
rollingWindowManager.recordMetric(currTime,
TopConf.ALL_CMDS, userName, 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ private int computeBucketIndex(long time) {
* as well as atomic fields.
*/
private class Bucket {
AtomicLong value = new AtomicLong(0);
AtomicLong updateTime = new AtomicLong(0);
private AtomicLong value = new AtomicLong(0);
private AtomicLong updateTime = new AtomicLong(-1); // -1 = never updated.

/**
* Check whether the last time that the bucket was updated is no longer
Expand All @@ -124,7 +124,7 @@ private class Bucket {
*/
boolean isStaleNow(long time) {
long utime = updateTime.get();
return time - utime >= windowLenMs;
return (utime == -1) || (time - utime >= windowLenMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,11 +68,15 @@ public static class TopWindow {

public TopWindow(int windowMillis) {
this.windowMillis = windowMillis;
this.top = Lists.newArrayList();
this.top = new LinkedList<>();
}

public void addOp(Op op) {
top.add(op);
if (op.getOpType().equals(TopConf.ALL_CMDS)) {
top.add(0, op);
} else {
top.add(op);
}
}

public int getWindowLenMs() {
Expand All @@ -86,41 +92,59 @@ public List<Op> getOps() {
* Represents an operation within a TopWindow. It contains a ranked
* set of the top users for the operation.
*/
public static class Op {
public static class Op implements Comparable<Op> {
private final String opType;
private final List<User> topUsers;
private final List<User> users;
private final long totalCount;
private final int limit;

public Op(String opType, long totalCount) {
public Op(String opType, UserCounts users, int limit) {
this.opType = opType;
this.topUsers = Lists.newArrayList();
this.totalCount = totalCount;
}

public void addUser(User u) {
topUsers.add(u);
this.users = new ArrayList<>(users);
this.users.sort(Collections.reverseOrder());
this.totalCount = users.getTotal();
this.limit = limit;
}

public String getOpType() {
return opType;
}

public List<User> getAllUsers() {
return users;
}

public List<User> getTopUsers() {
return topUsers;
return (users.size() > limit) ? users.subList(0, limit) : users;
}

public long getTotalCount() {
return totalCount;
}

@Override
public int compareTo(Op other) {
return Long.signum(totalCount - other.totalCount);
}

@Override
public boolean equals(Object o) {
return (o instanceof Op) && totalCount == ((Op)o).totalCount;
}

@Override
public int hashCode() {
return opType.hashCode();
}
}

/**
* Represents a user who called an Op within a TopWindow. Specifies the
* user and the number of times the user called the operation.
*/
public static class User {
public static class User implements Comparable<User> {
private final String user;
private final long count;
private long count;

public User(String user, long count) {
this.user = user;
Expand All @@ -134,6 +158,56 @@ public String getUser() {
public long getCount() {
return count;
}

public void add(long delta) {
count += delta;
}

@Override
public int compareTo(User other) {
return Long.signum(count - other.count);
}

@Override
public boolean equals(Object o) {
return (o instanceof User) && user.equals(((User)o).user);
}

@Override
public int hashCode() {
return user.hashCode();
}
}

private static class UserCounts extends ArrayList<User> {
private long total = 0;

UserCounts(int capacity) {
super(capacity);
}

@Override
public boolean add(User user) {
long count = user.getCount();
int i = indexOf(user);
if (i == -1) {
super.add(new User(user.getUser(), count));
} else {
get(i).add(count);
}
total += count;
return true;
}

@Override
public boolean addAll(Collection<? extends User> users) {
users.forEach(user -> add(user));
return true;
}

public long getTotal() {
return total;
}
}

/**
Expand All @@ -142,7 +216,7 @@ public long getCount() {
* operated on that metric.
*/
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
new ConcurrentHashMap<String, RollingWindowMap>();
new ConcurrentHashMap<>();

public RollingWindowManager(Configuration conf, int reportingPeriodMs) {

Expand Down Expand Up @@ -184,35 +258,33 @@ public void recordMetric(long time, String command,
*
* @param time the current time
* @return a TopWindow describing the top users for each metric in the
* window.
* window.
*/
public TopWindow snapshot(long time) {
TopWindow window = new TopWindow(windowLenMs);
Set<String> metricNames = metricMap.keySet();
LOG.debug("iterating in reported metrics, size={} values={}",
metricNames.size(), metricNames);
UserCounts totalCounts = new UserCounts(metricMap.size());
for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
String metricName = entry.getKey();
RollingWindowMap rollingWindows = entry.getValue();
TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
final int size = topN.size();
if (size == 0) {
continue;
}
Op op = new Op(metricName, topN.getTotal());
window.addOp(op);
// Reverse the users from the TopUsers using a stack,
// since we'd like them sorted in descending rather than ascending order
Stack<NameValuePair> reverse = new Stack<NameValuePair>();
for (int i = 0; i < size; i++) {
reverse.push(topN.poll());
}
for (int i = 0; i < size; i++) {
NameValuePair userEntry = reverse.pop();
User user = new User(userEntry.getName(), userEntry.getValue());
op.addUser(user);
UserCounts topN = getTopUsersForMetric(time, metricName, rollingWindows);
if (!topN.isEmpty()) {
window.addOp(new Op(metricName, topN, topUsersCnt));
totalCounts.addAll(topN);
}
}
// synthesize the overall total op count with the top users for every op.
Set<User> topUsers = new HashSet<>();
for (Op op : window.getOps()) {
topUsers.addAll(op.getTopUsers());
}
// intersect totals with the top users.
totalCounts.retainAll(topUsers);
// allowed to exceed the per-op topUsersCnt to capture total ops for
// any user
window.addOp(new Op(TopConf.ALL_CMDS, totalCounts, Integer.MAX_VALUE));
return window;
}

Expand All @@ -223,9 +295,9 @@ public TopWindow snapshot(long time) {
* @param metricName Name of metric
* @return
*/
private TopN getTopUsersForMetric(long time, String metricName,
private UserCounts getTopUsersForMetric(long time, String metricName,
RollingWindowMap rollingWindows) {
TopN topN = new TopN(topUsersCnt);
UserCounts topN = new UserCounts(topUsersCnt);
Iterator<Map.Entry<String, RollingWindow>> iterator =
rollingWindows.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -242,7 +314,7 @@ private TopN getTopUsersForMetric(long time, String metricName,
}
LOG.debug("offer window of metric: {} userName: {} sum: {}",
metricName, userName, windowSum);
topN.offer(new NameValuePair(userName, windowSum));
topN.add(new User(userName, windowSum));
}
LOG.debug("topN users size for command {} is: {}",
metricName, topN.size());
Expand Down
Loading

0 comments on commit c43ac3c

Please sign in to comment.