Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11225
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Dec 28, 2022
2 parents f79629f + 9668a85 commit 14a7b19
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,53 @@ private void updateNameNodeState(final String nsId,
}
}

/**
* Try to shuffle the multiple observer namenodes if listObserversFirst is true.
* @param inputNameNodes the input FederationNamenodeContext list. If listObserversFirst is true,
* all observers will be placed at the front of the collection.
* @param listObserversFirst true if we need to shuffle the multiple front observer namenodes.
* @return a list of FederationNamenodeContext.
* @param <T> a subclass of FederationNamenodeContext.
*/
private <T extends FederationNamenodeContext> List<T> shuffleObserverNN(
List<T> inputNameNodes, boolean listObserversFirst) {
if (!listObserversFirst) {
return inputNameNodes;
}
// Get Observers first.
List<T> observerList = new ArrayList<>();
for (T t : inputNameNodes) {
if (t.getState() == OBSERVER) {
observerList.add(t);
} else {
// The inputNameNodes are already sorted, so it can break
// when the first non-observer is encountered.
break;
}
}
// Returns the inputNameNodes if no shuffle is required
if (observerList.size() <= 1) {
return inputNameNodes;
}

// Shuffle multiple Observers
Collections.shuffle(observerList);

List<T> ret = new ArrayList<>(inputNameNodes.size());
ret.addAll(observerList);
for (int i = observerList.size(); i < inputNameNodes.size(); i++) {
ret.add(inputNameNodes.get(i));
}
return Collections.unmodifiableList(ret);
}

@Override
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
final String nsId, boolean listObserversFirst) throws IOException {

List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
if (ret != null) {
return ret;
return shuffleObserverNN(ret, listObserversFirst);
}

// Not cached, generate the value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,9 @@ public boolean truncate(String src, long newLength, String clientName)
RemoteMethod method = new RemoteMethod("truncate",
new Class<?>[] {String.class, long.class, String.class},
new RemoteParam(), newLength, clientName);
// Truncate can return true/false, so don't expect a result
return rpcClient.invokeSequential(locations, method, Boolean.class,
Boolean.TRUE);
null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,98 @@ public void setup() throws IOException, InterruptedException {
assertTrue(cleared);
}

@Test
public void testShuffleObserverNNs() throws Exception {
// Add an active entry to the store
NamenodeStatusReport activeReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
assertTrue(namenodeResolver.registerNamenode(activeReport));

// Add a standby entry to the store
NamenodeStatusReport standbyReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
assertTrue(namenodeResolver.registerNamenode(standbyReport));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> withoutObserver =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());

// Get namenodes from cache.
withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());

// Add an observer entry to the store
NamenodeStatusReport observerReport1 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport1));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());

// Get namenodes from cache.
observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());

// Add one new observer entry to the store
NamenodeStatusReport observerReport2 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport2));

// Load cache
stateStore.refreshCaches(true);

// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList2 =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());

// Get namenodes from cache.
observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());

// Test shuffler
List<? extends FederationNamenodeContext> observerList3;
boolean hit = false;
for (int i = 0; i < 1000; i++) {
observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState());
if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) &&
observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) {
hit = true;
break;
}
}
assertTrue(hit);
}

@Test
public void testStateStoreDisconnected() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
Expand All @@ -46,6 +47,7 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -191,6 +193,18 @@ private void testAll(final String path) throws Exception {
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 15);

// Test truncate
String testTruncateFile = path + "/dir2/dir22/dir220/file-truncate.txt";
createTestFile(routerFs, testTruncateFile);
Path testTruncateFilePath = new Path(testTruncateFile);
routerFs.truncate(testTruncateFilePath, 10);
TestFileTruncate.checkBlockRecovery(testTruncateFilePath,
(DistributedFileSystem) routerFs);
assertEquals("Truncate file fails", 10,
routerFs.getFileStatus(testTruncateFilePath).getLen());
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 16);

// Removing a directory should remove it from every subcluster
routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true);
assertDirsEverywhere(path, 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,6 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface StateMachine
<STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
public STATE getCurrentState();
public STATE getPreviousState();
public STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ private class InternalStateMachine
implements StateMachine<STATE, EVENTTYPE, EVENT> {
private final OPERAND operand;
private STATE currentState;
private STATE previousState;
private final StateTransitionListener<OPERAND, EVENT, STATE> listener;

InternalStateMachine(OPERAND operand, STATE initialState) {
Expand All @@ -479,14 +480,19 @@ public synchronized STATE getCurrentState() {
return currentState;
}

@Override
public synchronized STATE getPreviousState() {
return previousState;
}

@Override
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException {
listener.preTransition(operand, currentState, event);
STATE oldState = currentState;
previousState = currentState;
currentState = StateMachineFactory.this.doTransition
(operand, currentState, eventType, event);
listener.postTransition(operand, oldState, currentState, event);
listener.postTransition(operand, previousState, currentState, event);
return currentState;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;

import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* This class contains several utility functions for log aggregation tests.
* Any assertion libraries shouldn't be used here because this class is used by
* multiple modules including MapReduce.
*/
public final class TestContainerLogsUtils {

Expand Down Expand Up @@ -75,13 +75,16 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
fs.mkdirs(rootLogDirPath);
// Make sure the target dir is created. If not, FileNotFoundException is thrown
fs.getFileStatus(rootLogDirPath);
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));

fs.mkdirs(appLogsDir);
// Make sure the target dir is created. If not, FileNotFoundException is thrown
fs.getFileStatus(appLogsDir);
createContainerLogInLocalDir(appLogsDir, containerToContent, fs, fileName);
// upload container logs to remote log dir

Expand All @@ -95,7 +98,9 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
if (fs.exists(path) && deleteRemoteLogDir) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
fs.mkdirs(path);
// Make sure the target dir is created. If not, FileNotFoundException is thrown
fs.getFileStatus(path);
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, appId,
containerToContent.keySet(), path);
}
Expand All @@ -111,7 +116,9 @@ private static void createContainerLogInLocalDir(Path appLogsDir,
if (fs.exists(containerLogsDir)) {
fs.delete(containerLogsDir, true);
}
assertTrue(fs.mkdirs(containerLogsDir));
fs.mkdirs(containerLogsDir);
// Make sure the target dir is created. If not, FileNotFoundException is thrown
fs.getFileStatus(containerLogsDir);
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), fileName));
writer.write(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

public class WebServicesTestUtils {
public static long getXmlLong(Element element, String name) {
Expand Down Expand Up @@ -121,28 +120,24 @@ public static String getXmlAttrString(Element element, String name) {
}

public static void checkStringMatch(String print, String expected, String got) {
assertTrue(
got.matches(expected),
print + " doesn't match, got: " + got + " expected: " + expected);
assertThat(got).as(print).matches(expected);
}

public static void checkStringContains(String print, String expected, String got) {
assertTrue(
got.contains(expected),
print + " doesn't contain expected string, got: " + got + " expected: " + expected);
assertThat(got).as(print).contains(expected);
}

public static void checkStringEqual(String print, String expected, String got) {
assertEquals(got, expected);
assertThat(got).as(print).isEqualTo(expected);
}

public static void assertResponseStatusCode(StatusType expected,
StatusType actual) {
assertResponseStatusCode(null, expected, actual);
assertThat(expected.getStatusCode()).isEqualTo(actual.getStatusCode());
}

public static void assertResponseStatusCode(String errmsg,
StatusType expected, StatusType actual) {
assertEquals(expected.getStatusCode(), actual.getStatusCode(), errmsg);
assertThat(expected.getStatusCode()).withFailMessage(errmsg).isEqualTo(actual.getStatusCode());
}
}
Loading

0 comments on commit 14a7b19

Please sign in to comment.